diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/README.rst b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/README.rst
new file mode 100644
index 00000000..bf9ea59b
--- /dev/null
+++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/README.rst
@@ -0,0 +1,217 @@
+OpenTelemetry LlamaIndex Instrumentation
+=========================================
+
+This library provides automatic instrumentation for LlamaIndex applications using OpenTelemetry.
+
+Installation
+------------
+
+Development installation::
+
+ # Install the package in editable mode
+ cd instrumentation-genai/opentelemetry-instrumentation-llamaindex
+ pip install -e .
+
+ # Install test dependencies
+ pip install -e ".[test]"
+
+ # Install util-genai (required for telemetry)
+ cd ../../util/opentelemetry-util-genai
+ pip install -e .
+
+
+Quick Start
+-----------
+
+.. code-block:: python
+
+ import os
+ from opentelemetry.instrumentation.llamaindex import LlamaindexInstrumentor
+ from opentelemetry import trace, metrics
+ from opentelemetry.sdk.trace import TracerProvider
+ from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
+ from opentelemetry.sdk.metrics import MeterProvider
+ from opentelemetry.sdk.metrics.export import InMemoryMetricReader
+
+ # Enable metrics (default is spans only)
+ os.environ["OTEL_INSTRUMENTATION_GENAI_EMITTERS"] = "span_metric"
+
+ # Setup tracing
+ trace.set_tracer_provider(TracerProvider())
+ trace.get_tracer_provider().add_span_processor(
+ SimpleSpanProcessor(ConsoleSpanExporter())
+ )
+
+ # Setup metrics
+ metric_reader = InMemoryMetricReader()
+ meter_provider = MeterProvider(metric_readers=[metric_reader])
+ metrics.set_meter_provider(meter_provider)
+
+ # Enable instrumentation with providers
+ LlamaindexInstrumentor().instrument(
+ tracer_provider=trace.get_tracer_provider(),
+ meter_provider=meter_provider
+ )
+
+ # Use LlamaIndex as normal
+ from llama_index.llms.openai import OpenAI
+ from llama_index.core.llms import ChatMessage, MessageRole
+
+ llm = OpenAI(model="gpt-3.5-turbo")
+ messages = [ChatMessage(role=MessageRole.USER, content="Hello")]
+ response = llm.chat(messages)
+
+
+Running Tests
+-------------
+
+**LLM Tests**:
+
+.. code-block:: bash
+
+ # Set environment variables
+ export OPENAI_API_KEY=your-api-key
+ export OTEL_INSTRUMENTATION_GENAI_EMITTERS=span_metric
+
+ # Run the test
+ cd tests
+ python test_llm_instrumentation.py
+
+**Embedding Tests**:
+
+.. code-block:: bash
+
+ # Set environment variables
+ export OPENAI_API_KEY=your-api-key
+ export OTEL_INSTRUMENTATION_GENAI_EMITTERS=span_metric
+
+ # Run the test
+ cd tests
+ python test_embedding_instrumentation.py
+
+
+Expected Output
+---------------
+
+**LLM Span Attributes**::
+
+ {
+ "gen_ai.framework": "llamaindex",
+ "gen_ai.request.model": "gpt-3.5-turbo",
+ "gen_ai.operation.name": "chat",
+ "gen_ai.usage.input_tokens": 24,
+ "gen_ai.usage.output_tokens": 7
+ }
+
+**Embedding Span Attributes**::
+
+ {
+ "gen_ai.operation.name": "embeddings",
+ "gen_ai.request.model": "text-embedding-3-small",
+ "gen_ai.provider.name": "openai",
+ "gen_ai.embeddings.dimension.count": 1536
+ }
+
+**Metrics**::
+
+ Metric: gen_ai.client.operation.duration
+ Duration: 0.6900 seconds
+ Count: 1
+
+ Metric: gen_ai.client.token.usage
+ Token type: input, Sum: 24, Count: 1
+ Token type: output, Sum: 7, Count: 1
+
+
+Key Implementation Differences from LangChain
+----------------------------------------------
+
+**1. Event-Based Callbacks**
+
+LlamaIndex uses ``on_event_start(event_type, ...)`` and ``on_event_end(event_type, ...)``
+instead of LangChain's method-based callbacks (``on_llm_start``, ``on_llm_end``).
+
+Event types are dispatched via ``CBEventType`` enum::
+
+ CBEventType.LLM # LLM invocations (chat, complete)
+ CBEventType.AGENT # Agent steps (not yet instrumented)
+ CBEventType.EMBEDDING # Embedding operations (get_text_embedding, get_text_embedding_batch)
+
+**2. Handler Registration**
+
+LlamaIndex uses ``handlers`` list::
+
+ callback_manager.handlers.append(handler)
+
+LangChain uses ``inheritable_handlers``::
+
+ callback_manager.inheritable_handlers.append(handler)
+
+**3. Response Structure**
+
+LlamaIndex ``ChatMessage`` uses ``blocks`` (list of TextBlock objects)::
+
+ message.content # Computed property from blocks[0].text
+
+LangChain uses simple strings::
+
+ message.content # Direct string property
+
+**4. Token Usage**
+
+LlamaIndex returns objects (not dicts)::
+
+ response.raw.usage.prompt_tokens # Object attribute
+ response.raw.usage.completion_tokens # Object attribute
+
+LangChain returns dicts::
+
+ response["usage"]["prompt_tokens"] # Dict key
+ response["usage"]["completion_tokens"] # Dict key
+
+
+Supported Features
+------------------
+
+**LLM Operations**
+
+* ✅ Chat completion (``llm.chat()``, ``llm.stream_chat()``)
+* ✅ Text completion (``llm.complete()``, ``llm.stream_complete()``)
+* ✅ Token usage tracking
+* ✅ Model name detection
+* ✅ Framework attribution
+
+**Embedding Operations**
+
+* ✅ Single text embedding (``embed_model.get_text_embedding()``)
+* ✅ Batch embedding (``embed_model.get_text_embedding_batch()``)
+* ✅ Query embedding (``embed_model.get_query_embedding()``)
+* ✅ Provider detection (OpenAI, Azure, AWS Bedrock, Google, Cohere, HuggingFace, Ollama, and more)
+* ✅ Dimension count tracking
+* ✅ Input text capture
+
+**Provider Detection**
+
+Embedding instrumentation automatically detects the provider from class names:
+
+* **OpenAI**: ``OpenAIEmbedding``
+* **Azure**: ``AzureOpenAIEmbedding``
+* **AWS**: ``BedrockEmbedding``
+* **Google**: ``GeminiEmbedding``, ``VertexTextEmbedding``, ``GooglePaLMEmbedding``
+* **Cohere**: ``CohereEmbedding``
+* **HuggingFace**: ``HuggingFaceEmbedding``, ``HuggingFaceInferenceAPIEmbedding``
+* **Ollama**: ``OllamaEmbedding``
+* **Anthropic**: ``AnthropicEmbedding``
+* **MistralAI**: ``MistralAIEmbedding``
+* **Together**: ``TogetherEmbedding``
+* **Fireworks**: ``FireworksEmbedding``
+* **Voyage**: ``VoyageEmbedding``
+* **Jina**: ``JinaEmbedding``
+
+
+References
+----------
+
+* `OpenTelemetry Project `_
+* `LlamaIndex `_
+* `LlamaIndex Callbacks `_
diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_agent.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_agent.py
new file mode 100644
index 00000000..234fc142
--- /dev/null
+++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_agent.py
@@ -0,0 +1,95 @@
+import asyncio
+import os
+import sys
+
+from llama_index.core.agent import ReActAgent
+from llama_index.core.tools import FunctionTool
+from llama_index.llms.openai import OpenAI
+from llama_index.core import Settings
+
+from opentelemetry import trace, metrics
+from opentelemetry.sdk.trace import TracerProvider
+from opentelemetry.sdk.trace.export import BatchSpanProcessor
+from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
+from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
+from opentelemetry.sdk.metrics import MeterProvider
+from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
+from opentelemetry.instrumentation.llamaindex import LlamaindexInstrumentor
+
+
+# 1. Setup Telemetry
+def setup_telemetry():
+ trace.set_tracer_provider(TracerProvider())
+ trace.get_tracer_provider().add_span_processor(
+ BatchSpanProcessor(OTLPSpanExporter(insecure=True))
+ )
+
+ metric_reader = PeriodicExportingMetricReader(OTLPMetricExporter(insecure=True))
+ metrics.set_meter_provider(MeterProvider(metric_readers=[metric_reader]))
+
+
+# 2. Define Tools
+def search_flights(origin: str, destination: str, date: str) -> str:
+ """Search for flights between two cities on a specific date."""
+ print(f" [Tool] Searching flights from {origin} to {destination} on {date}...")
+ return f"Flight UA123 from {origin} to {destination} on {date} costs $500."
+
+
+def search_hotels(city: str, check_in: str) -> str:
+ """Search for hotels in a city."""
+ print(f" [Tool] Searching hotels in {city} for {check_in}...")
+ return f"Hotel Grand in {city} is available for $200/night."
+
+
+def book_ticket(flight_number: str) -> str:
+ """Book a flight ticket."""
+ print(f" [Tool] Booking flight {flight_number}...")
+ return f"Confirmed booking for {flight_number}. Ticket #999."
+
+
+# 3. Main Agent Logic
+async def run_travel_planner():
+ # Check for API Key
+ if not os.getenv("OPENAI_API_KEY"):
+ print("Error: OPENAI_API_KEY environment variable is not set.")
+ sys.exit(1)
+
+ setup_telemetry()
+
+ # Instrument LlamaIndex
+ LlamaindexInstrumentor().instrument()
+
+ # Setup LLM
+ Settings.llm = OpenAI(model="gpt-4o-mini", temperature=0)
+
+ # Create Tools
+ tools = [
+ FunctionTool.from_defaults(fn=search_flights),
+ FunctionTool.from_defaults(fn=search_hotels),
+ FunctionTool.from_defaults(fn=book_ticket),
+ ]
+
+ # Create Agent
+ # ReActAgent in LlamaIndex uses the workflow engine internally
+ agent = ReActAgent(tools=tools, llm=Settings.llm, verbose=True)
+
+ # Run Workflow
+ user_request = "I want to fly from New York to Paris on 2023-12-01. Find a flight and book it, then find a hotel."
+
+ # We use the async run method which returns the handler we instrumented
+ # This triggers wrap_agent_run -> WorkflowEventInstrumentor
+ handler = agent.run(user_msg=user_request)
+ response = await handler
+
+ print(f"\nFinal Response: {response}")
+
+ # Ensure spans are flushed before exit
+ provider = trace.get_tracer_provider()
+ if hasattr(provider, "force_flush"):
+ provider.force_flush()
+ if hasattr(provider, "shutdown"):
+ provider.shutdown()
+
+
+if __name__ == "__main__":
+ asyncio.run(run_travel_planner())
diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/Dockerfile b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/Dockerfile
new file mode 100644
index 00000000..93138ff4
--- /dev/null
+++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/Dockerfile
@@ -0,0 +1,24 @@
+FROM python:3.12-slim
+
+WORKDIR /app
+
+# Disable telemetry during build to avoid connection errors
+ENV OTEL_SDK_DISABLED=true
+
+# Install dependencies
+COPY instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/requirements.txt .
+RUN pip install --default-timeout=100 --retries=5 -r requirements.txt
+
+# Copy and install local instrumentation package
+COPY instrumentation-genai/opentelemetry-instrumentation-llamaindex /tmp/instrumentation-llamaindex/
+RUN pip install --no-cache-dir /tmp/instrumentation-llamaindex && \
+ rm -rf /tmp/instrumentation-llamaindex
+
+# Copy application code
+COPY instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/main_server.py .
+
+# Expose port
+EXPOSE 8080
+
+# Run the server
+CMD ["python", "main_server.py"]
diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/README.md b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/README.md
new file mode 100644
index 00000000..bdae86f8
--- /dev/null
+++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/README.md
@@ -0,0 +1,119 @@
+# LlamaIndex Travel Planner - Kubernetes Deployment
+
+This directory contains Kubernetes deployment files for the LlamaIndex Travel Planner example application with OpenTelemetry instrumentation.
+
+## Components
+
+- **Server (`main_server.py`)**: HTTP server that exposes a `/plan` endpoint for travel planning requests using LlamaIndex ReActAgent
+- **Deployment (`deployment.yaml`)**: Kubernetes Deployment and Service configuration
+- **CronJob (`cronjob.yaml`)**: Automated load generator that sends periodic travel planning requests
+
+## Architecture
+
+```
+┌─────────────┐
+│ CronJob │ (Load Generator)
+│ (curl) │
+└──────┬──────┘
+ │ HTTP POST /plan
+ ▼
+┌─────────────────────────────┐
+│ Deployment │
+│ llamaindex-travel-planner │
+│ ┌─────────────────────┐ │
+│ │ LlamaIndex Agent │ │
+│ │ + OTEL Instrumentation│ │
+│ └─────────────────────┘ │
+└──────┬──────────────────────┘
+ │ OTLP (gRPC)
+ ▼
+┌─────────────┐
+│ OTEL │
+│ Collector │
+└─────────────┘
+```
+
+## Prerequisites
+
+1. Kubernetes cluster with namespace `travel-planner`
+2. OpenAI API key stored as secret:
+ ```bash
+ kubectl create secret generic openai-api-keys \
+ --from-literal=openai-api-key=YOUR_API_KEY \
+ -n travel-planner
+ ```
+3. OpenTelemetry Collector running on cluster nodes (DaemonSet)
+
+## Building the Docker Image
+
+```bash
+# From this directory
+docker build -t shuniche855/llamaindex-travel-planner:0.0.1 .
+
+# Push to registry
+docker push shuniche855/llamaindex-travel-planner:0.0.1
+```
+
+## Deployment
+
+```bash
+# Deploy the server
+kubectl apply -f deployment.yaml
+
+# Deploy the load generator CronJob
+kubectl apply -f cronjob.yaml
+```
+
+## Testing
+
+### Health Check
+
+```bash
+kubectl port-forward -n travel-planner svc/llamaindex-travel-planner-service 8080:80
+curl http://localhost:8080/health
+```
+
+### Manual Request
+
+```bash
+curl -X POST http://localhost:8080/plan \
+ -H "Content-Type: application/json" \
+ -d '{
+ "destination": "Paris",
+ "origin": "New York",
+ "budget": 3000,
+ "duration": 5,
+ "travelers": 2,
+ "interests": ["sightseeing", "food", "culture"],
+ "departure_date": "2024-06-15"
+ }'
+```
+
+## Environment Variables
+
+Key environment variables configured in `deployment.yaml`:
+
+- `OTEL_SERVICE_NAME`: Service name for telemetry
+- `OTEL_EXPORTER_OTLP_ENDPOINT`: OTLP collector endpoint
+- `OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT`: Enable message content capture
+- `OPENAI_API_KEY`: OpenAI API key (from secret)
+
+## Monitoring
+
+The application generates:
+
+- **Traces**: LlamaIndex agent execution, LLM calls, tool invocations
+- **Metrics**: LLM token usage, latency, error rates
+- **Logs**: Application logs with trace correlation
+
+View traces in your observability platform (Splunk O11y, Jaeger, etc.)
+
+## Load Generation Schedule
+
+The CronJob runs:
+
+- **Schedule**: Every 30 minutes during business hours
+- **Days**: Monday-Friday
+- **Time**: 8am-6pm PST (16:00-02:00 UTC)
+
+Adjust the schedule in `cronjob.yaml` as needed.
diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/cronjob.yaml b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/cronjob.yaml
new file mode 100644
index 00000000..6c948871
--- /dev/null
+++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/cronjob.yaml
@@ -0,0 +1,85 @@
+apiVersion: batch/v1
+kind: CronJob
+metadata:
+ name: llamaindex-travel-planner-load-generator
+ namespace: llamaindex-travel-planner
+spec:
+ schedule: "0,30 16-23,0-2 * * 1-5" # Every 30 min, 8am-6pm PST Mon-Fri (16:00-02:00 UTC)
+ jobTemplate:
+ spec:
+ template:
+ spec:
+ containers:
+ - name: load-generator
+ image: curlimages/curl:latest
+ command:
+ - /bin/sh
+ - -c
+ - |
+ echo "Generating LlamaIndex travel planning requests..."
+
+ # Travel destinations and parameters
+ destinations="Tokyo Paris NewYork Sydney Barcelona Dubai London Singapore Rome Amsterdam"
+ origins="Boston Chicago Seattle LosAngeles Miami Denver Atlanta Portland"
+ budgets="2000 3000 1500 4000 2500 3500"
+ durations="3 5 7 10 14"
+
+ # Interest combinations
+ interests_1="sightseeing food culture"
+ interests_2="adventure shopping food"
+ interests_3="culture sightseeing"
+ interests_4="food adventure shopping"
+
+ # Generate a random request
+ RAND=$(date +%s)
+ dest=$(echo $destinations | cut -d' ' -f$((($RAND % 10) + 1)))
+ origin=$(echo $origins | cut -d' ' -f$((($RAND % 8) + 1)))
+ budget=$(echo $budgets | cut -d' ' -f$((($RAND % 6) + 1)))
+ duration=$(echo $durations | cut -d' ' -f$((($RAND % 5) + 1)))
+ travelers=$((($RAND % 4) + 1))
+
+ # Select random interests
+ case $(($RAND % 4)) in
+ 0) interests=$interests_1;;
+ 1) interests=$interests_2;;
+ 2) interests=$interests_3;;
+ *) interests=$interests_4;;
+ esac
+
+ # Convert interests to JSON array
+ interest_json=$(echo $interests | awk '{
+ printf "["
+ for(i=1; i<=NF; i++) {
+ if(i>1) printf ","
+ printf "\"%s\"", $i
+ }
+ printf "]"
+ }')
+
+ echo "Planning trip:"
+ echo " From: $origin"
+ echo " To: $dest"
+ echo " Budget: \$$budget"
+ echo " Duration: $duration days"
+ echo " Travelers: $travelers"
+ echo " Interests: $interests"
+
+ # Make request to the LlamaIndex travel planner service
+ curl -X POST http://llamaindex-travel-planner-service.llamaindex-travel-planner.svc.cluster.local/plan \
+ -H "Content-Type: application/json" \
+ -d "{
+ \"destination\": \"$dest\",
+ \"origin\": \"$origin\",
+ \"budget\": $budget,
+ \"duration\": $duration,
+ \"travelers\": $travelers,
+ \"interests\": $interest_json,
+ \"departure_date\": \"2024-06-15\"
+ }" \
+ --max-time 600 || echo "Request failed"
+
+ echo "Load generation completed"
+ restartPolicy: OnFailure
+ backoffLimit: 3
+ successfulJobsHistoryLimit: 3
+ failedJobsHistoryLimit: 3
diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/deployment.yaml b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/deployment.yaml
new file mode 100644
index 00000000..77a11bf8
--- /dev/null
+++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/deployment.yaml
@@ -0,0 +1,126 @@
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: llamaindex-travel-planner
+ namespace: llamaindex-travel-planner
+ labels:
+ app: llamaindex-travel-planner
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app: llamaindex-travel-planner
+ template:
+ metadata:
+ labels:
+ app: llamaindex-travel-planner
+ spec:
+ containers:
+ - name: travel-planner
+ image: shuniche855/llamaindex-travel-planner:0.0.9
+ imagePullPolicy: Always
+ ports:
+ - containerPort: 8080
+ name: http
+ env:
+ # Python unbuffered output for real-time logging
+ - name: PYTHONUNBUFFERED
+ value: "1"
+ # Load OpenAI API key from secret
+ - name: OPENAI_API_KEY
+ valueFrom:
+ secretKeyRef:
+ name: openai-api-keys
+ key: openai-api-key
+ # Enable OTEL SDK (disabled in Dockerfile during build)
+ - name: OTEL_SDK_DISABLED
+ value: "false"
+ # Service Name
+ - name: OTEL_SERVICE_NAME
+ value: "opentelemetry-python-llamaindex-travel-planner"
+ # Additional OTEL configuration
+ - name: OTEL_RESOURCE_ATTRIBUTES
+ value: "deployment.environment=o11y-inframon-ai"
+ - name: SPLUNK_OTEL_AGENT
+ valueFrom:
+ fieldRef:
+ fieldPath: status.hostIP
+ - name: OTEL_EXPORTER_OTLP_ENDPOINT
+ value: "http://$(SPLUNK_OTEL_AGENT):4317"
+ - name: OTEL_EXPORTER_OTLP_PROTOCOL
+ value: "grpc"
+ - name: HOME
+ value: "/tmp"
+ - name: OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE
+ value: "DELTA"
+ - name: OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED
+ value: "true"
+ - name: OTEL_PYTHON_EXCLUDED_URLS
+ value: "^(https?://)?[^/]+(/health)?$"
+ - name: OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT
+ value: "true"
+ - name: OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT_MODE
+ value: "SPAN_AND_EVENT"
+ - name: OTEL_INSTRUMENTATION_GENAI_EVALS_RESULTS_AGGREGATION
+ value: "true"
+ - name: OTEL_INSTRUMENTATION_GENAI_EMITTERS
+ value: "span_metric_event,splunk"
+ - name: OTEL_INSTRUMENTATION_GENAI_EMITTERS_EVALUATION
+ value: "replace-category:SplunkEvaluationResults"
+ - name: OTEL_GENAI_EVAL_DEBUG_SKIPS
+ value: "true"
+ - name: OTEL_GENAI_EVAL_DEBUG_EACH
+ value: "false"
+ - name: OTEL_INSTRUMENTATION_GENAI_DEBUG
+ value: "true"
+ - name: SPLUNK_PROFILER_ENABLED
+ value: "true"
+ # Set evaluation wait time to 10 seconds (short enough to avoid health check timeout)
+ - name: EVAL_WAIT_SECONDS
+ value: "10"
+ - name: PORT
+ value: "8080"
+ resources:
+ requests:
+ memory: "512Mi"
+ cpu: "250m"
+ limits:
+ memory: "2Gi"
+ cpu: "1000m"
+ securityContext:
+ allowPrivilegeEscalation: false
+ runAsNonRoot: true
+ runAsUser: 1000
+ capabilities:
+ drop:
+ - ALL
+ readOnlyRootFilesystem: false
+ livenessProbe:
+ httpGet:
+ path: /health
+ port: 8080
+ initialDelaySeconds: 30
+ periodSeconds: 30
+ timeoutSeconds: 10
+ failureThreshold: 10
+ readinessProbe:
+ httpGet:
+ path: /health
+ port: 8080
+ initialDelaySeconds: 5
+ periodSeconds: 5
+ restartPolicy: Always
+---
+apiVersion: v1
+kind: Service
+metadata:
+ name: llamaindex-travel-planner-service
+ namespace: llamaindex-travel-planner
+spec:
+ selector:
+ app: llamaindex-travel-planner
+ ports:
+ - name: http
+ port: 80
+ targetPort: 8080
+ type: ClusterIP
diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/main_server.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/main_server.py
new file mode 100644
index 00000000..bb6d2a9f
--- /dev/null
+++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/main_server.py
@@ -0,0 +1,331 @@
+"""
+Travel Planner Server using LlamaIndex ReActAgent.
+
+This server exposes an HTTP endpoint for travel planning requests and uses
+OpenTelemetry instrumentation to capture traces and metrics.
+"""
+
+import asyncio
+import json
+import os
+from datetime import datetime
+from http.server import BaseHTTPRequestHandler, HTTPServer
+from typing import Dict, Any
+
+from llama_index.core.agent import ReActAgent
+from llama_index.core.tools import FunctionTool
+from llama_index.llms.openai import OpenAI
+from llama_index.core import Settings
+
+from opentelemetry import trace, metrics
+from opentelemetry.sdk.trace import TracerProvider
+from opentelemetry.sdk.trace.export import BatchSpanProcessor
+from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
+from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
+from opentelemetry.sdk.metrics import MeterProvider
+from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
+from opentelemetry.instrumentation.llamaindex import LlamaindexInstrumentor
+
+
+# Setup Telemetry
+def setup_telemetry():
+ """Initialize OpenTelemetry tracing and metrics.
+
+ Service name and OTLP endpoint are configured via environment variables:
+ - OTEL_SERVICE_NAME
+ - OTEL_EXPORTER_OTLP_ENDPOINT
+ """
+ trace.set_tracer_provider(TracerProvider())
+ trace.get_tracer_provider().add_span_processor(
+ BatchSpanProcessor(OTLPSpanExporter())
+ )
+
+ metric_reader = PeriodicExportingMetricReader(OTLPMetricExporter())
+ metrics.set_meter_provider(MeterProvider(metric_readers=[metric_reader]))
+
+
+# Define Travel Planning Tools
+def search_flights(origin: str, destination: str, date: str) -> str:
+ """Search for flights between two cities on a specific date."""
+ print(f" [Tool] Searching flights from {origin} to {destination} on {date}...")
+
+ # Simulate flight search results
+ flight_price = 800
+ return (
+ f"Found Flight UA{abs(hash(origin + destination)) % 1000}: "
+ f"{origin} → {destination} on {date}, "
+ f"Price: ${flight_price}, "
+ f"Departure: 10:00 AM, Arrival: 2:00 PM"
+ )
+
+
+def search_hotels(city: str, check_in: str, check_out: str) -> str:
+ """Search for hotels in a city for given check-in and check-out dates."""
+ print(f" [Tool] Searching hotels in {city} from {check_in} to {check_out}...")
+
+ # Simulate hotel search results
+ nightly_rate = 200
+ return (
+ f"Found Hotel Grand {city}: "
+ f"Available from {check_in} to {check_out}, "
+ f"Rate: ${nightly_rate}/night, "
+ f"Rating: 4.5/5, Amenities: WiFi, Breakfast, Pool"
+ )
+
+
+def search_activities(city: str) -> str:
+ """Search for activities and attractions in a city."""
+ print(f" [Tool] Searching activities in {city}...")
+
+ activities = [
+ f"City Tour of {city} - $50",
+ f"Food Tour in {city} - $80",
+ f"Museum Pass for {city} - $40",
+ ]
+
+ return f"Recommended activities: {', '.join(activities)}"
+
+
+# Global agent instances
+_flight_agent = None
+_hotel_agent = None
+_activity_agent = None
+
+
+def get_flight_agent():
+ """Get or create the flight search agent."""
+ global _flight_agent
+ if _flight_agent is None:
+ Settings.llm = OpenAI(model="gpt-4o-mini", temperature=0)
+ tools = [FunctionTool.from_defaults(fn=search_flights)]
+ system_prompt = "You are a flight search specialist. Use the search_flights tool to find flights, then provide the result."
+ _flight_agent = ReActAgent(
+ tools=tools,
+ llm=Settings.llm,
+ verbose=True,
+ system_prompt=system_prompt
+ )
+ return _flight_agent
+
+
+def get_hotel_agent():
+ """Get or create the hotel search agent."""
+ global _hotel_agent
+ if _hotel_agent is None:
+ Settings.llm = OpenAI(model="gpt-4o-mini", temperature=0)
+ tools = [FunctionTool.from_defaults(fn=search_hotels)]
+ system_prompt = "You are a hotel search specialist. Use the search_hotels tool to find hotels, then provide the result."
+ _hotel_agent = ReActAgent(
+ tools=tools,
+ llm=Settings.llm,
+ verbose=True,
+ system_prompt=system_prompt
+ )
+ return _hotel_agent
+
+
+def get_activity_agent():
+ """Get or create the activity search agent."""
+ global _activity_agent
+ if _activity_agent is None:
+ Settings.llm = OpenAI(model="gpt-4o-mini", temperature=0)
+ tools = [FunctionTool.from_defaults(fn=search_activities)]
+ system_prompt = "You are an activity recommendation specialist. Use the search_activities tool to find activities, then provide the result."
+ _activity_agent = ReActAgent(
+ tools=tools,
+ llm=Settings.llm,
+ verbose=True,
+ system_prompt=system_prompt
+ )
+ return _activity_agent
+
+
+class TravelPlannerHandler(BaseHTTPRequestHandler):
+ """HTTP request handler for travel planning."""
+
+ def do_GET(self):
+ """Handle GET requests (health check)."""
+ if self.path == '/health':
+ self.send_response(200)
+ self.send_header('Content-type', 'application/json')
+ self.end_headers()
+ self.wfile.write(json.dumps({'status': 'healthy'}).encode())
+ else:
+ self.send_response(404)
+ self.end_headers()
+
+ def do_POST(self):
+ """Handle POST requests for travel planning."""
+ if self.path == '/plan':
+ # Create a root span for the HTTP request
+ tracer = trace.get_tracer(__name__)
+ with tracer.start_as_current_span(
+ "POST /plan",
+ kind=trace.SpanKind.SERVER,
+ attributes={
+ "http.method": "POST",
+ "http.target": "/plan",
+ "http.scheme": "http",
+ }
+ ) as span:
+ try:
+ content_length = int(self.headers['Content-Length'])
+ post_data = self.rfile.read(content_length)
+ request_data = json.loads(post_data.decode('utf-8'))
+
+ # Extract parameters
+ destination = request_data.get('destination', 'Paris')
+ origin = request_data.get('origin', 'New York')
+ budget = request_data.get('budget', 3000)
+ duration = request_data.get('duration', 5)
+ travelers = request_data.get('travelers', 2)
+ interests = request_data.get('interests', ['sightseeing', 'food'])
+ departure_date = request_data.get('departure_date', '2024-06-01')
+
+ print(f"\n{'='*60}")
+ print(f"New Travel Planning Request")
+ print(f"{'='*60}")
+ print(f"Destination: {destination}")
+ print(f"Origin: {origin}")
+ print(f"Budget: ${budget}")
+ print(f"Duration: {duration} days")
+ print(f"Travelers: {travelers}")
+ print(f"Interests: {', '.join(interests)}")
+ print(f"{'='*60}\n")
+
+ # Calculate check-out date
+ from datetime import datetime, timedelta
+ check_in = datetime.strptime(departure_date, "%Y-%m-%d")
+ check_out = check_in + timedelta(days=duration)
+ check_out_date = check_out.strftime("%Y-%m-%d")
+
+ # Run agents sequentially (like LangChain multi-agent approach)
+ async def run_agents():
+ results = []
+
+ # 1. Flight specialist agent
+ print("\n--- Flight Specialist Agent ---")
+ flight_agent = get_flight_agent()
+ flight_query = f"Search for flights from {origin} to {destination} departing on {departure_date}"
+ flight_handler = flight_agent.run(user_msg=flight_query, max_iterations=3)
+ flight_response = await flight_handler
+ results.append(f"Flights: {flight_response}")
+
+ # 2. Hotel specialist agent
+ print("\n--- Hotel Specialist Agent ---")
+ hotel_agent = get_hotel_agent()
+ hotel_query = f"Search for hotels in {destination} from {departure_date} to {check_out_date}"
+ hotel_handler = hotel_agent.run(user_msg=hotel_query, max_iterations=3)
+ hotel_response = await hotel_handler
+ results.append(f"Hotels: {hotel_response}")
+
+ # 3. Activity specialist agent
+ print("\n--- Activity Specialist Agent ---")
+ activity_agent = get_activity_agent()
+ activity_query = f"Recommend activities in {destination}"
+ activity_handler = activity_agent.run(user_msg=activity_query, max_iterations=3)
+ activity_response = await activity_handler
+ results.append(f"Activities: {activity_response}")
+
+ return "\n\n".join(results)
+
+ try:
+ result = asyncio.run(run_agents())
+ except RuntimeError as e:
+ if "asyncio.run() cannot be called from a running event loop" in str(e):
+ # Fallback for when event loop is already running
+ loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
+ try:
+ result = loop.run_until_complete(run_agents())
+ finally:
+ loop.close()
+ else:
+ raise
+
+ print(f"\n{'='*60}")
+ print(f"Planning Complete")
+ print(f"{'='*60}\n")
+
+ # Send response
+ self.send_response(200)
+ self.send_header('Content-type', 'application/json')
+ self.end_headers()
+
+ response_data = {
+ 'status': 'success',
+ 'request': request_data,
+ 'plan': result,
+ 'timestamp': datetime.utcnow().isoformat()
+ }
+
+ self.wfile.write(json.dumps(response_data, indent=2).encode())
+ span.set_attribute("http.status_code", 200)
+
+ except Exception as e:
+ print(f"Error processing request: {e}")
+ import traceback
+ traceback.print_exc()
+
+ span.set_attribute("http.status_code", 500)
+ span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
+
+ self.send_response(500)
+ self.send_header('Content-type', 'application/json')
+ self.end_headers()
+ self.wfile.write(json.dumps({
+ 'status': 'error',
+ 'error': str(e)
+ }).encode())
+ else:
+ self.send_response(404)
+ self.end_headers()
+
+ def log_message(self, format, *args):
+ """Override to customize logging."""
+ print(f"{self.address_string()} - {format % args}")
+
+
+def main():
+ """Start the travel planner server."""
+ # Check for API Key
+ if not os.getenv("OPENAI_API_KEY"):
+ print("Error: OPENAI_API_KEY environment variable is not set.")
+ return 1
+
+ # Setup telemetry
+ setup_telemetry()
+
+ # Auto-instrument LlamaIndex (captures telemetry automatically via callbacks)
+ LlamaindexInstrumentor().instrument()
+
+ # Start HTTP server
+ port = int(os.getenv("PORT", "8080"))
+ server = HTTPServer(('0.0.0.0', port), TravelPlannerHandler)
+
+ print(f"\n{'='*60}")
+ print(f"Travel Planner Server Starting")
+ print(f"{'='*60}")
+ print(f"Port: {port}")
+ print(f"Health check: http://localhost:{port}/health")
+ print(f"Planning endpoint: POST http://localhost:{port}/plan")
+ print(f"{'='*60}\n")
+
+ try:
+ server.serve_forever()
+ except KeyboardInterrupt:
+ print("\nShutting down server...")
+ server.shutdown()
+
+ # Flush telemetry
+ provider = trace.get_tracer_provider()
+ if hasattr(provider, 'force_flush'):
+ provider.force_flush()
+ if hasattr(provider, 'shutdown'):
+ provider.shutdown()
+
+ return 0
+
+
+if __name__ == "__main__":
+ exit(main())
diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/requirements.txt b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/requirements.txt
new file mode 100644
index 00000000..27af852a
--- /dev/null
+++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/requirements.txt
@@ -0,0 +1,5 @@
+llama-index-core>=0.14.0
+llama-index-llms-openai>=0.6.0
+opentelemetry-api>=1.27.0
+opentelemetry-sdk>=1.27.0
+opentelemetry-exporter-otlp-proto-grpc>=1.27.0
diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/pyproject.toml b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/pyproject.toml
new file mode 100644
index 00000000..55a67080
--- /dev/null
+++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/pyproject.toml
@@ -0,0 +1,58 @@
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[project]
+name = "splunk-otel-instrumentation-llamaindex"
+dynamic = ["version"]
+description = "OpenTelemetry LlamaIndex instrumentation"
+readme = "README.rst"
+license = "Apache-2.0"
+requires-python = ">=3.9"
+authors = [
+ { name = "OpenTelemetry Authors", email = "cncf-opentelemetry-contributors@lists.cncf.io" },
+]
+classifiers = [
+ "Development Status :: 4 - Beta",
+ "Intended Audience :: Developers",
+ "License :: OSI Approved :: Apache Software License",
+ "Programming Language :: Python",
+ "Programming Language :: Python :: 3.9",
+ "Programming Language :: Python :: 3.10",
+ "Programming Language :: Python :: 3.11",
+ "Programming Language :: Python :: 3.12",
+ "Programming Language :: Python :: 3.13",
+]
+dependencies = [
+ "opentelemetry-api ~= 1.38.0.dev0",
+ "opentelemetry-instrumentation ~= 0.59b0.dev0",
+ "opentelemetry-semantic-conventions ~= 0.59b0.dev0",
+ "splunk-otel-util-genai>=0.1.4",
+]
+
+[project.optional-dependencies]
+instruments = ["llama-index-core >= 0.14.0"]
+test = [
+ "llama-index-core >= 0.14.0",
+ "llama-index-llms-openai >= 0.6.0",
+ "pytest >= 7.0.0",
+]
+
+[project.entry-points.opentelemetry_instrumentor]
+llamaindex = "opentelemetry.instrumentation.llamaindex:LlamaindexInstrumentor"
+
+[project.urls]
+Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/instrumentation-genai/opentelemetry-instrumentation-llamaindex"
+Repository = "https://github.com/open-telemetry/opentelemetry-python-contrib"
+
+[tool.hatch.version]
+path = "src/opentelemetry/instrumentation/llamaindex/version.py"
+
+[tool.hatch.build.targets.sdist]
+include = ["/src", "/tests"]
+
+[tool.hatch.build.targets.wheel]
+packages = ["src/opentelemetry"]
+
+[tool.ruff]
+exclude = ["./"]
diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/__init__.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/__init__.py
new file mode 100644
index 00000000..64a2836f
--- /dev/null
+++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/__init__.py
@@ -0,0 +1,79 @@
+from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
+from opentelemetry.util.genai.handler import get_telemetry_handler
+from opentelemetry.instrumentation.llamaindex.config import Config
+from opentelemetry.instrumentation.llamaindex.callback_handler import (
+ LlamaindexCallbackHandler,
+)
+from opentelemetry.instrumentation.llamaindex.workflow_instrumentation import (
+ wrap_agent_run,
+)
+from wrapt import wrap_function_wrapper
+
+_instruments = ("llama-index-core >= 0.14.0",)
+
+
+class LlamaindexInstrumentor(BaseInstrumentor):
+ def __init__(
+ self,
+ exception_logger=None,
+ disable_trace_context_propagation=False,
+ use_legacy_attributes: bool = True,
+ ):
+ super().__init__()
+ Config._exception_logger = exception_logger
+ Config.use_legacy_attributes = use_legacy_attributes
+ self._disable_trace_context_propagation = disable_trace_context_propagation
+ self._telemetry_handler = None
+
+ def instrumentation_dependencies(self):
+ return _instruments
+
+ def _instrument(self, **kwargs):
+ tracer_provider = kwargs.get("tracer_provider")
+ meter_provider = kwargs.get("meter_provider")
+ logger_provider = kwargs.get("logger_provider")
+
+ self._telemetry_handler = get_telemetry_handler(
+ tracer_provider=tracer_provider,
+ meter_provider=meter_provider,
+ logger_provider=logger_provider,
+ )
+
+ llamaindexCallBackHandler = LlamaindexCallbackHandler(
+ telemetry_handler=self._telemetry_handler
+ )
+
+ wrap_function_wrapper(
+ module="llama_index.core.callbacks.base",
+ name="CallbackManager.__init__",
+ wrapper=_BaseCallbackManagerInitWrapper(llamaindexCallBackHandler),
+ )
+
+ # Instrument Workflow-based agents
+ try:
+ wrap_function_wrapper(
+ module="llama_index.core.agent",
+ name="ReActAgent.run",
+ wrapper=wrap_agent_run,
+ )
+ except Exception:
+ # ReActAgent might not be available or importable
+ pass
+
+ def _uninstrument(self, **kwargs):
+ pass
+
+
+class _BaseCallbackManagerInitWrapper:
+ def __init__(self, callback_handler: "LlamaindexCallbackHandler"):
+ self._callback_handler = callback_handler
+
+ def __call__(self, wrapped, instance, args, kwargs) -> None:
+ wrapped(*args, **kwargs)
+ # LlamaIndex uses 'handlers' instead of 'inheritable_handlers'
+ for handler in instance.handlers:
+ if isinstance(handler, type(self._callback_handler)):
+ break
+ else:
+ self._callback_handler._callback_manager = instance
+ instance.add_handler(self._callback_handler)
diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py
new file mode 100644
index 00000000..07ab2642
--- /dev/null
+++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py
@@ -0,0 +1,524 @@
+from typing import Any, Dict, Optional
+
+from llama_index.core.callbacks.base_handler import BaseCallbackHandler
+from llama_index.core.callbacks.schema import CBEventType
+
+from opentelemetry import trace
+from opentelemetry.util.genai.handler import TelemetryHandler
+from opentelemetry.util.genai.types import (
+ AgentInvocation,
+ EmbeddingInvocation,
+ InputMessage,
+ LLMInvocation,
+ OutputMessage,
+ Text,
+ ToolCall,
+)
+
+from .vendor_detection import detect_vendor_from_class
+
+
+def _safe_str(value: Any) -> str:
+ """Safely convert value to string."""
+ try:
+ return str(value)
+ except (TypeError, ValueError):
+ return ""
+
+
+class LlamaindexCallbackHandler(BaseCallbackHandler):
+ """LlamaIndex callback handler supporting LLM and Embedding instrumentation."""
+
+ def __init__(
+ self,
+ telemetry_handler: Optional[TelemetryHandler] = None,
+ ) -> None:
+ super().__init__(
+ event_starts_to_ignore=[],
+ event_ends_to_ignore=[],
+ )
+ self._handler = telemetry_handler
+
+ def start_trace(self, trace_id: Optional[str] = None) -> None:
+ """Start a trace - required by BaseCallbackHandler."""
+ pass
+
+ def end_trace(
+ self,
+ trace_id: Optional[str] = None,
+ trace_map: Optional[Dict[str, Any]] = None,
+ ) -> None:
+ """End a trace - required by BaseCallbackHandler."""
+ pass
+
+ def on_event_start(
+ self,
+ event_type: CBEventType,
+ payload: Optional[Dict[str, Any]] = None,
+ event_id: str = "",
+ parent_id: str = "",
+ **kwargs: Any,
+ ) -> str:
+ """Handle event start - processing LLM, EMBEDDING, AGENT_STEP, and FUNCTION_CALL events."""
+ if event_type == CBEventType.LLM:
+ self._handle_llm_start(event_id, parent_id, payload, **kwargs)
+ elif event_type == CBEventType.EMBEDDING:
+ self._handle_embedding_start(event_id, parent_id, payload, **kwargs)
+ elif event_type == CBEventType.AGENT_STEP:
+ self._handle_agent_step_start(event_id, parent_id, payload, **kwargs)
+ elif event_type == CBEventType.FUNCTION_CALL:
+ self._handle_function_call_start(event_id, parent_id, payload, **kwargs)
+ return event_id
+
+ def on_event_end(
+ self,
+ event_type: CBEventType,
+ payload: Optional[Dict[str, Any]] = None,
+ event_id: str = "",
+ **kwargs: Any,
+ ) -> None:
+ """Handle event end - processing LLM, EMBEDDING, AGENT_STEP, and FUNCTION_CALL events."""
+ if event_type == CBEventType.LLM:
+ self._handle_llm_end(event_id, payload, **kwargs)
+ elif event_type == CBEventType.EMBEDDING:
+ self._handle_embedding_end(event_id, payload, **kwargs)
+ elif event_type == CBEventType.AGENT_STEP:
+ self._handle_agent_step_end(event_id, payload, **kwargs)
+ elif event_type == CBEventType.FUNCTION_CALL:
+ self._handle_function_call_end(event_id, payload, **kwargs)
+
+ def _handle_llm_start(
+ self,
+ event_id: str,
+ parent_id: str,
+ payload: Optional[Dict[str, Any]] = None,
+ **kwargs: Any,
+ ) -> None:
+ """Handle LLM invocation start."""
+ if not self._handler or not payload:
+ return
+
+ # Extract model information from payload
+ serialized = payload.get("serialized", {})
+ model_name = (
+ serialized.get("model") or serialized.get("model_name") or "unknown"
+ )
+
+ # Extract messages from payload
+ # LlamaIndex messages are ChatMessage objects with .content and .role properties
+ messages = payload.get("messages", [])
+ input_messages = []
+
+ for msg in messages:
+ # Handle ChatMessage objects (has .content property and .role attribute)
+ if hasattr(msg, "content") and hasattr(msg, "role"):
+ # Extract role - could be MessageRole enum
+ role_value = (
+ str(msg.role.value) if hasattr(msg.role, "value") else str(msg.role)
+ )
+ # Extract content - this is a property that pulls from blocks[0].text
+ content = _safe_str(msg.content)
+ input_messages.append(
+ InputMessage(role=role_value, parts=[Text(content=content)])
+ )
+ elif isinstance(msg, dict):
+ # Handle serialized messages (dict format)
+ role = msg.get("role", "user")
+ # Try to extract from blocks first (LlamaIndex format)
+ blocks = msg.get("blocks", [])
+ if blocks and isinstance(blocks[0], dict):
+ content = blocks[0].get("text", "")
+ else:
+ # Fallback to direct content field
+ content = msg.get("content", "")
+
+ role_value = str(role.value) if hasattr(role, "value") else str(role)
+ input_messages.append(
+ InputMessage(
+ role=role_value,
+ parts=[Text(content=_safe_str(content))],
+ )
+ )
+
+ # Create LLM invocation with event_id as run_id
+ llm_inv = LLMInvocation(
+ request_model=_safe_str(model_name),
+ input_messages=input_messages,
+ attributes={},
+ run_id=event_id, # Use event_id as run_id for registry lookup
+ )
+ llm_inv.framework = "llamaindex"
+
+ # Get the currently active span to establish parent-child relationship
+ # First try to get from active agent context (workflow-based agents)
+ parent_span = None
+ if self._handler._agent_context_stack:
+ # Get the current agent's span from the span registry
+ _, agent_run_id = self._handler._agent_context_stack[-1]
+ parent_span = self._handler._span_registry.get(agent_run_id)
+
+ # Fallback to OpenTelemetry context if no agent span found
+ if not parent_span:
+ current_span = trace.get_current_span()
+ if current_span and current_span.is_recording():
+ parent_span = current_span
+
+ if parent_span:
+ llm_inv.parent_span = parent_span
+
+ # Start the LLM invocation (handler stores it in _entity_registry)
+ self._handler.start_llm(llm_inv)
+
+ def _handle_llm_end(
+ self,
+ event_id: str,
+ payload: Optional[Dict[str, Any]],
+ **kwargs: Any,
+ ) -> None:
+ """Handle LLM invocation end."""
+ if not self._handler:
+ return
+
+ # Get the LLM invocation from handler's registry using event_id
+ llm_inv = self._handler.get_entity(event_id)
+ if not llm_inv or not isinstance(llm_inv, LLMInvocation):
+ return
+
+ if payload:
+ # Extract response from payload
+ response = payload.get("response")
+
+ # Handle both dict and object types for response
+ if response:
+ # Get message - could be dict or object
+ if isinstance(response, dict):
+ message = response.get("message", {})
+ raw_response = response.get("raw")
+ else:
+ # response is a ChatResponse object
+ message = getattr(response, "message", None)
+ raw_response = getattr(response, "raw", None)
+
+ # Extract content from message
+ if message:
+ if isinstance(message, dict):
+ # Message is dict
+ blocks = message.get("blocks", [])
+ if blocks and isinstance(blocks[0], dict):
+ content = blocks[0].get("text", "")
+ else:
+ content = message.get("content", "")
+ else:
+ # Message is ChatMessage object
+ blocks = getattr(message, "blocks", [])
+ if blocks and len(blocks) > 0:
+ content = getattr(blocks[0], "text", "")
+ else:
+ content = getattr(message, "content", "")
+
+ # Create output message
+ llm_inv.output_messages = [
+ OutputMessage(
+ role="assistant",
+ parts=[Text(content=_safe_str(content))],
+ finish_reason="stop",
+ )
+ ]
+
+ # Extract token usage from response.raw (OpenAI format)
+ # LlamaIndex stores the raw API response (e.g., OpenAI response) in response.raw
+ # raw_response could be a dict or an object (e.g., ChatCompletion from OpenAI)
+ if raw_response:
+ # Try to get usage from dict or object
+ if isinstance(raw_response, dict):
+ usage = raw_response.get("usage", {})
+ else:
+ # It's an object, try to get usage attribute
+ usage = getattr(raw_response, "usage", None)
+
+ if usage:
+ # usage could also be dict or object
+ if isinstance(usage, dict):
+ llm_inv.input_tokens = usage.get("prompt_tokens")
+ llm_inv.output_tokens = usage.get("completion_tokens")
+ else:
+ llm_inv.input_tokens = getattr(usage, "prompt_tokens", None)
+ llm_inv.output_tokens = getattr(
+ usage, "completion_tokens", None
+ )
+
+ # Stop the LLM invocation
+ self._handler.stop_llm(llm_inv)
+
+ def _handle_embedding_start(
+ self,
+ event_id: str,
+ parent_id: str,
+ payload: Optional[Dict[str, Any]] = None,
+ **kwargs: Any,
+ ) -> None:
+ """Handle embedding invocation start."""
+ if not self._handler or not payload:
+ return
+
+ # Extract model information from payload
+ serialized = payload.get("serialized", {})
+ model_name = (
+ serialized.get("model_name") or serialized.get("model") or "unknown"
+ )
+
+ # Detect provider from class name
+ class_name = serialized.get("class_name", "")
+ provider = detect_vendor_from_class(class_name)
+
+ # Note: input texts are not available at start time in LlamaIndex
+ # They will be available in the end event payload
+
+ # Create embedding invocation with event_id as run_id
+ emb_inv = EmbeddingInvocation(
+ request_model=_safe_str(model_name),
+ input_texts=[], # Will be populated on end event
+ provider=provider,
+ attributes={},
+ run_id=event_id,
+ )
+ emb_inv.framework = "llamaindex"
+
+ # Get the currently active span to establish parent-child relationship
+ # First try to get from active agent context (workflow-based agents)
+ parent_span = None
+ if self._handler._agent_context_stack:
+ # Get the current agent's span from the span registry
+ _, agent_run_id = self._handler._agent_context_stack[-1]
+ parent_span = self._handler._span_registry.get(agent_run_id)
+
+ # Fallback to OpenTelemetry context if no agent span found
+ if not parent_span:
+ current_span = trace.get_current_span()
+ if current_span and current_span.is_recording():
+ parent_span = current_span
+
+ if parent_span:
+ emb_inv.parent_span = parent_span
+
+ # Start the embedding invocation
+ self._handler.start_embedding(emb_inv)
+
+ def _handle_embedding_end(
+ self,
+ event_id: str,
+ payload: Optional[Dict[str, Any]],
+ **kwargs: Any,
+ ) -> None:
+ """Handle embedding invocation end."""
+ if not self._handler:
+ return
+
+ # Get the embedding invocation from handler's registry using event_id
+ emb_inv = self._handler.get_entity(event_id)
+ if not emb_inv or not isinstance(emb_inv, EmbeddingInvocation):
+ return
+
+ if payload:
+ # Extract input chunks (texts) from response
+ # chunks is the list of input texts that were embedded
+ chunks = payload.get("chunks", [])
+ if chunks:
+ emb_inv.input_texts = [_safe_str(chunk) for chunk in chunks]
+
+ # Extract embedding vectors from response
+ # embeddings is the list of output vectors
+ embeddings = payload.get("embeddings", [])
+
+ # Determine dimension from first embedding vector
+ if embeddings and len(embeddings) > 0:
+ first_embedding = embeddings[0]
+ if isinstance(first_embedding, list):
+ emb_inv.dimension_count = len(first_embedding)
+ elif hasattr(first_embedding, "__len__"):
+ emb_inv.dimension_count = len(first_embedding)
+
+ # Stop the embedding invocation
+ self._handler.stop_embedding(emb_inv)
+
+ def _find_nearest_agent(
+ self, parent_id: Optional[str]
+ ) -> Optional[AgentInvocation]:
+ """Walk up parent chain to find the nearest agent invocation."""
+ if not self._handler:
+ return None
+ current_id = parent_id
+ while current_id:
+ entity = self._handler.get_entity(current_id)
+ if isinstance(entity, AgentInvocation):
+ return entity
+ # Move to parent
+ current_id = getattr(entity, "parent_run_id", None)
+ if current_id:
+ current_id = str(current_id)
+ return None
+
+ def _handle_agent_step_start(
+ self,
+ event_id: str,
+ parent_id: str,
+ payload: Optional[Dict[str, Any]] = None,
+ **kwargs: Any,
+ ) -> None:
+ """Handle agent step start - create AgentInvocation span."""
+ if not self._handler or not payload:
+ return
+
+ # Extract agent information from payload
+ task_id = payload.get("task_id", "")
+ input_text = payload.get("input")
+ step = payload.get("step") # TaskStep object with agent metadata
+
+ # Extract agent metadata from step or payload
+ agent_name = None
+ agent_type = None
+ agent_description = None
+ model_name = None
+
+ if step and hasattr(step, "step_state"):
+ # Try to get agent from step state
+ step_state = step.step_state
+ if hasattr(step_state, "agent"):
+ agent = step_state.agent
+ agent_name = getattr(agent, "name", None)
+ agent_type = getattr(agent, "agent_type", None) or type(agent).__name__
+ agent_description = getattr(agent, "description", None)
+ # Try to get model from agent's LLM
+ if hasattr(agent, "llm"):
+ llm = agent.llm
+ model_name = getattr(llm, "model", None) or getattr(
+ llm, "model_name", None
+ )
+
+ # Create AgentInvocation for the agent execution
+ agent_invocation = AgentInvocation(
+ name=f"agent.task.{task_id}" if task_id else "agent.invoke",
+ run_id=event_id,
+ parent_run_id=parent_id if parent_id else None,
+ input_context=input_text if input_text else "",
+ attributes={},
+ )
+ agent_invocation.framework = "llamaindex"
+
+ # Set enhanced metadata
+ if agent_name:
+ agent_invocation.agent_name = _safe_str(agent_name)
+ if agent_type:
+ agent_invocation.agent_type = _safe_str(agent_type)
+ if agent_description:
+ agent_invocation.description = _safe_str(agent_description)
+ if model_name:
+ agent_invocation.model = _safe_str(model_name)
+
+ self._handler.start_agent(agent_invocation)
+
+ def _handle_agent_step_end(
+ self,
+ event_id: str,
+ payload: Optional[Dict[str, Any]],
+ **kwargs: Any,
+ ) -> None:
+ """Handle agent step end."""
+ if not self._handler:
+ return
+
+ agent_invocation = self._handler.get_entity(event_id)
+ if not agent_invocation or not isinstance(agent_invocation, AgentInvocation):
+ return
+
+ if payload:
+ # Extract response/output if available
+ response = payload.get("response")
+ if response:
+ agent_invocation.output_result = _safe_str(response)
+
+ # Stop the agent invocation
+ self._handler.stop_agent(agent_invocation)
+
+ def _handle_function_call_start(
+ self,
+ event_id: str,
+ parent_id: str,
+ payload: Optional[Dict[str, Any]] = None,
+ **kwargs: Any,
+ ) -> None:
+ """Handle function/tool call start."""
+ if not self._handler or not payload:
+ return
+
+ # Extract tool information
+ tool = payload.get("tool")
+ if not tool:
+ return
+
+ tool_name = (
+ getattr(tool, "name", "unknown_tool")
+ if hasattr(tool, "name")
+ else "unknown_tool"
+ )
+ tool_description = (
+ getattr(tool, "description", "") if hasattr(tool, "description") else ""
+ )
+
+ # Extract function arguments
+ function_call = payload.get("function_call", {})
+ arguments = function_call if function_call else {}
+
+ # Find nearest agent for context propagation
+ context_agent = self._find_nearest_agent(parent_id) if parent_id else None
+
+ # Create ToolCall entity
+ tool_call = ToolCall(
+ name=tool_name,
+ arguments=arguments,
+ id=event_id,
+ )
+
+ # Set attributes
+ tool_call.attributes = {
+ "tool.description": tool_description,
+ }
+ tool_call.run_id = event_id # type: ignore[attr-defined]
+ tool_call.parent_run_id = parent_id if parent_id else None # type: ignore[attr-defined]
+ tool_call.framework = "llamaindex" # type: ignore[attr-defined]
+
+ # Propagate agent context to tool call
+ if context_agent:
+ agent_name = getattr(context_agent, "agent_name", None) or getattr(
+ context_agent, "name", None
+ )
+ if agent_name:
+ tool_call.agent_name = _safe_str(agent_name) # type: ignore[attr-defined]
+ tool_call.agent_id = str(context_agent.run_id) # type: ignore[attr-defined]
+
+ # Start the tool call
+ self._handler.start_tool_call(tool_call)
+
+ def _handle_function_call_end(
+ self,
+ event_id: str,
+ payload: Optional[Dict[str, Any]],
+ **kwargs: Any,
+ ) -> None:
+ """Handle function/tool call end."""
+ if not self._handler:
+ return
+
+ tool_call = self._handler.get_entity(event_id)
+ if not tool_call or not isinstance(tool_call, ToolCall):
+ return
+
+ if payload:
+ # Extract tool output/result
+ tool_output = payload.get("tool_output")
+ if tool_output:
+ # Store the result as response
+ tool_call.response = _safe_str(tool_output) # type: ignore[attr-defined]
+
+ # Stop the tool call
+ self._handler.stop_tool_call(tool_call)
diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/config.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/config.py
new file mode 100644
index 00000000..44199c03
--- /dev/null
+++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/config.py
@@ -0,0 +1,3 @@
+class Config:
+ exception_logger = None
+ use_legacy_attributes = True
diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/vendor_detection.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/vendor_detection.py
new file mode 100644
index 00000000..3feeaee9
--- /dev/null
+++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/vendor_detection.py
@@ -0,0 +1,123 @@
+"""Vendor detection for LlamaIndex embedding providers."""
+
+from dataclasses import dataclass
+from typing import List, Set
+
+
+@dataclass(frozen=True)
+class VendorRule:
+ """Rule for detecting vendor from LlamaIndex class names."""
+
+ exact_matches: Set[str]
+ patterns: List[str]
+ vendor_name: str
+
+ def matches(self, class_name: str) -> bool:
+ """Check if class name matches this vendor rule."""
+ if class_name in self.exact_matches:
+ return True
+ class_lower = class_name.lower()
+ return any(pattern in class_lower for pattern in self.patterns)
+
+
+def _get_vendor_rules() -> List[VendorRule]:
+ """
+ Get vendor detection rules ordered by specificity (most specific first).
+
+ Returns:
+ List of VendorRule objects for detecting embedding vendors from class names
+ """
+ return [
+ VendorRule(
+ exact_matches={"AzureOpenAIEmbedding"},
+ patterns=["azure"],
+ vendor_name="azure",
+ ),
+ VendorRule(
+ exact_matches={"OpenAIEmbedding"},
+ patterns=["openai"],
+ vendor_name="openai",
+ ),
+ VendorRule(
+ exact_matches={"BedrockEmbedding"},
+ patterns=["bedrock", "aws"],
+ vendor_name="aws",
+ ),
+ VendorRule(
+ exact_matches={
+ "VertexTextEmbedding",
+ "GeminiEmbedding",
+ "GooglePaLMEmbedding",
+ },
+ patterns=["vertex", "google", "palm", "gemini"],
+ vendor_name="google",
+ ),
+ VendorRule(
+ exact_matches={"CohereEmbedding"},
+ patterns=["cohere"],
+ vendor_name="cohere",
+ ),
+ VendorRule(
+ exact_matches={"HuggingFaceEmbedding", "HuggingFaceInferenceAPIEmbedding"},
+ patterns=["huggingface"],
+ vendor_name="huggingface",
+ ),
+ VendorRule(
+ exact_matches={"OllamaEmbedding"},
+ patterns=["ollama"],
+ vendor_name="ollama",
+ ),
+ VendorRule(
+ exact_matches={"AnthropicEmbedding"},
+ patterns=["anthropic"],
+ vendor_name="anthropic",
+ ),
+ VendorRule(
+ exact_matches={"MistralAIEmbedding"},
+ patterns=["mistral"],
+ vendor_name="mistralai",
+ ),
+ VendorRule(
+ exact_matches={"TogetherEmbedding"},
+ patterns=["together"],
+ vendor_name="together",
+ ),
+ VendorRule(
+ exact_matches={"FireworksEmbedding"},
+ patterns=["fireworks"],
+ vendor_name="fireworks",
+ ),
+ VendorRule(
+ exact_matches={"VoyageEmbedding"},
+ patterns=["voyage"],
+ vendor_name="voyage",
+ ),
+ VendorRule(
+ exact_matches={"JinaEmbedding"},
+ patterns=["jina"],
+ vendor_name="jina",
+ ),
+ ]
+
+
+def detect_vendor_from_class(class_name: str) -> str:
+ """
+ Detect vendor from LlamaIndex embedding class name.
+ Uses unified detection rules combining exact matches and patterns.
+
+ Args:
+ class_name: The class name from serialized embedding information
+
+ Returns:
+ Vendor string (lowercase), defaults to None if no match found
+ """
+ if not class_name:
+ return None
+
+ vendor_rules = _get_vendor_rules()
+
+ for rule in vendor_rules:
+ if rule.matches(class_name):
+ return rule.vendor_name
+
+ return None
diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/version.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/version.py
new file mode 100644
index 00000000..3dc1f76b
--- /dev/null
+++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/version.py
@@ -0,0 +1 @@
+__version__ = "0.1.0"
diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/workflow_instrumentation.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/workflow_instrumentation.py
new file mode 100644
index 00000000..f3898e5f
--- /dev/null
+++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/workflow_instrumentation.py
@@ -0,0 +1,249 @@
+"""
+Workflow-based agent instrumentation for LlamaIndex.
+
+This module provides instrumentation for Workflow-based agents (ReActAgent, etc.)
+by intercepting workflow event streams to capture agent steps and tool calls.
+"""
+
+import asyncio
+from uuid import uuid4
+
+from opentelemetry.util.genai.handler import TelemetryHandler
+from opentelemetry.util.genai.types import AgentInvocation, ToolCall
+
+
+class WorkflowEventInstrumentor:
+ """Instrumentor that wraps WorkflowHandler to capture agent and tool events."""
+
+ def __init__(self, handler: TelemetryHandler):
+ self._handler = handler
+ self._active_agents = {} # event_id -> AgentInvocation
+ self._active_tools = {} # tool_id -> ToolCall
+
+ async def instrument_workflow_handler(self, workflow_handler, initial_message: str):
+ """
+ Instrument a WorkflowHandler by streaming its events and creating telemetry spans.
+
+ Args:
+ workflow_handler: The WorkflowHandler returned by agent.run()
+ initial_message: The user's initial message to the agent
+ """
+ from llama_index.core.agent.workflow.workflow_events import (
+ AgentInput,
+ AgentOutput,
+ ToolCall as WorkflowToolCall,
+ ToolCallResult,
+ )
+
+ agent_invocation = None
+ agent_run_id = None
+
+ try:
+ async for event in workflow_handler.stream_events():
+ # Agent step start
+ if isinstance(event, AgentInput):
+ # Start a new agent invocation
+ agent_run_id = str(uuid4())
+ agent_invocation = AgentInvocation(
+ name=f"agent.{event.current_agent_name}",
+ run_id=agent_run_id,
+ input_context=str(event.input)
+ if hasattr(event, "input") and event.input
+ else initial_message,
+ attributes={},
+ )
+ agent_invocation.framework = "llamaindex"
+ agent_invocation.agent_name = event.current_agent_name
+
+ self._handler.start_agent(agent_invocation)
+ self._active_agents[agent_run_id] = agent_invocation
+
+ # Tool call start
+ elif isinstance(event, WorkflowToolCall):
+ tool_call = ToolCall(
+ arguments=event.tool_kwargs,
+ name=event.tool_name,
+ id=event.tool_id,
+ attributes={},
+ )
+ tool_call.framework = "llamaindex"
+
+ # Associate with current agent if available
+ if agent_invocation:
+ tool_call.agent_name = agent_invocation.agent_name
+ tool_call.agent_id = str(agent_invocation.run_id)
+ # Set parent_span explicitly - the agent span is the parent of this tool
+ if hasattr(agent_invocation, "span") and agent_invocation.span:
+ tool_call.parent_span = agent_invocation.span
+
+ self._handler.start_tool_call(tool_call)
+ self._active_tools[event.tool_id] = tool_call
+
+ # Tool call end
+ elif isinstance(event, ToolCallResult):
+ tool_call = self._active_tools.get(event.tool_id)
+ if tool_call:
+ # Extract result
+ result = event.tool_output
+ if hasattr(result, "content"):
+ tool_call.result = str(result.content)
+ else:
+ tool_call.result = str(result)
+
+ self._handler.stop_tool_call(tool_call)
+ del self._active_tools[event.tool_id]
+
+ # Agent step end (when no more tools to call)
+ elif isinstance(event, AgentOutput):
+ # Check if this is the final output (no tool calls)
+ if not event.tool_calls and agent_invocation:
+ # Extract response
+ if hasattr(event.response, "content"):
+ agent_invocation.output_result = str(event.response.content)
+ else:
+ agent_invocation.output_result = str(event.response)
+
+ self._handler.stop_agent(agent_invocation)
+ if agent_run_id:
+ del self._active_agents[agent_run_id]
+ agent_invocation = None
+ agent_run_id = None
+
+ except Exception as e:
+ # Clean up any active spans on error
+ for tool_call in list(self._active_tools.values()):
+ from opentelemetry.util.genai.types import Error
+
+ error = Error(message=str(e), type=type(e))
+ self._handler.fail_tool_call(tool_call, error)
+ self._active_tools.clear()
+
+ if agent_invocation:
+ from opentelemetry.util.genai.types import Error
+
+ error = Error(message=str(e), type=type(e))
+ self._handler.fail_agent(agent_invocation, error)
+ if agent_run_id:
+ del self._active_agents[agent_run_id]
+
+ raise
+
+
+def wrap_agent_run(wrapped, instance, args, kwargs):
+ """
+ Wrap agent.run() to instrument workflow events.
+
+ This creates a root agent span immediately when agent.run() is called,
+ ensuring all LLM and tool calls inherit the same trace context.
+
+ The root span is pushed onto the agent_context_stack, which allows the
+ callback handler to retrieve it when LLM/embedding events occur.
+ """
+ # Get the initial user message
+ user_msg = kwargs.get("user_msg") or (args[0] if args else "")
+
+ # Get TelemetryHandler from callback handler if available
+ from llama_index.core import Settings
+ from opentelemetry.util.genai.types import AgentInvocation
+ from opentelemetry import trace, context
+
+ telemetry_handler = None
+ for callback_handler in Settings.callback_manager.handlers:
+ if hasattr(callback_handler, "_handler"):
+ telemetry_handler = callback_handler._handler
+ break
+
+ # Create a root agent span immediately to ensure all subsequent calls
+ # (LLM, tools) inherit this trace context
+ root_agent = None
+ parent_context = None
+ if telemetry_handler:
+ from uuid import uuid4
+
+ # Create root agent invocation before workflow starts
+ root_agent = AgentInvocation(
+ name=f"agent.{type(instance).__name__}",
+ run_id=str(uuid4()),
+ input_context=str(user_msg),
+ attributes={},
+ )
+ root_agent.framework = "llamaindex"
+ root_agent.agent_name = type(instance).__name__
+
+ # Start the root agent span immediately
+ # This pushes (agent_name, run_id) onto the _agent_context_stack
+ # and stores the span in _span_registry[run_id]
+ telemetry_handler.start_agent(root_agent)
+
+ # Capture the current context (which includes the active span)
+ # so we can propagate it to async tasks
+ parent_context = context.get_current()
+
+ # Call the original run() method to get the workflow handler
+ handler = wrapped(*args, **kwargs)
+
+ if telemetry_handler and root_agent and parent_context:
+ # Create workflow instrumentor for detailed step tracking
+ instrumentor = WorkflowEventInstrumentor(telemetry_handler)
+
+ # Wrap the handler to close the root span when the workflow completes
+ original_handler = handler
+
+ class InstrumentedHandler:
+ """Wrapper that closes the root agent span when workflow completes."""
+
+ def __init__(self, original, root_span_agent, ctx):
+ self._original = original
+ self._root_agent = root_span_agent
+ self._result = None
+ self._parent_context = ctx
+
+ def __await__(self):
+ # Start background task to instrument workflow events
+ async def stream_events():
+ try:
+ # Attach the parent context before processing workflow events
+ token = context.attach(self._parent_context)
+ try:
+ await instrumentor.instrument_workflow_handler(
+ self._original, str(user_msg)
+ )
+ finally:
+ context.detach(token)
+ except Exception as e:
+ import logging
+
+ logger = logging.getLogger(__name__)
+ logger.warning(f"Error instrumenting workflow events: {e}")
+
+ asyncio.create_task(stream_events())
+
+ # Wait for the actual workflow to complete and return the result
+ return self._await_impl().__await__()
+
+ async def _await_impl(self):
+ """Actual async implementation."""
+ # Attach the parent context to ensure proper span hierarchy
+ token = context.attach(self._parent_context)
+ try:
+ self._result = await self._original
+ self._root_agent.output_result = str(self._result)
+ telemetry_handler.stop_agent(self._root_agent)
+ except Exception as e:
+ from opentelemetry.util.genai.types import Error
+
+ telemetry_handler.fail_agent(
+ self._root_agent, Error(message=str(e), type=type(e))
+ )
+ raise
+ finally:
+ context.detach(token)
+ return self._result
+
+ def __getattr__(self, name):
+ # Delegate all other attributes to the original handler
+ return getattr(self._original, name)
+
+ handler = InstrumentedHandler(original_handler, root_agent, parent_context)
+
+ return handler
diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_embedding_instrumentation.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_embedding_instrumentation.py
new file mode 100644
index 00000000..9828bb58
--- /dev/null
+++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_embedding_instrumentation.py
@@ -0,0 +1,151 @@
+"""Test embedding instrumentation for LlamaIndex."""
+
+import os
+
+from llama_index.core import Settings
+from llama_index.core.callbacks import CallbackManager
+from llama_index.embeddings.openai import OpenAIEmbedding
+from opentelemetry import metrics, trace
+from opentelemetry.sdk.metrics import MeterProvider
+from opentelemetry.sdk.metrics.export import InMemoryMetricReader
+from opentelemetry.sdk.trace import TracerProvider
+from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
+
+from opentelemetry.instrumentation.llamaindex import LlamaindexInstrumentor
+
+
+# Global setup - shared across tests
+metric_reader = None
+instrumentor = None
+
+
+def setup_telemetry():
+ """Setup OpenTelemetry with span and metric exporters (once)."""
+ global metric_reader, instrumentor
+
+ if metric_reader is not None:
+ return metric_reader
+
+ # Enable metrics
+ os.environ["OTEL_INSTRUMENTATION_GENAI_EMITTERS"] = "span_metric"
+
+ # Setup tracing
+ trace.set_tracer_provider(TracerProvider())
+ trace.get_tracer_provider().add_span_processor(
+ SimpleSpanProcessor(ConsoleSpanExporter())
+ )
+
+ # Setup metrics with InMemoryMetricReader
+ metric_reader = InMemoryMetricReader()
+ meter_provider = MeterProvider(metric_readers=[metric_reader])
+ metrics.set_meter_provider(meter_provider)
+
+ # Enable instrumentation once
+ instrumentor = LlamaindexInstrumentor()
+ instrumentor.instrument(
+ tracer_provider=trace.get_tracer_provider(),
+ meter_provider=metrics.get_meter_provider(),
+ )
+
+ return metric_reader
+
+
+def test_embedding_single_text():
+ """Test single text embedding instrumentation."""
+ print("\nTest: Single Text Embedding")
+ print("=" * 60)
+
+ metric_reader = setup_telemetry()
+
+ # Configure embedding model
+ embed_model = OpenAIEmbedding(
+ model="text-embedding-3-small",
+ api_key=os.environ.get("OPENAI_API_KEY"),
+ )
+ Settings.embed_model = embed_model
+
+ # Make sure callback manager is initialized
+ if Settings.callback_manager is None:
+ Settings.callback_manager = CallbackManager()
+
+ # Generate single embedding
+ text = "LlamaIndex is a data framework for LLM applications"
+ embedding = embed_model.get_text_embedding(text)
+
+ print(f"\nText: {text}")
+ print(f"Embedding dimension: {len(embedding)}")
+ print(f"First 5 values: {embedding[:5]}")
+
+ # Validate metrics
+ print("\nMetrics:")
+ metrics_data = metric_reader.get_metrics_data()
+ for resource_metric in metrics_data.resource_metrics:
+ for scope_metric in resource_metric.scope_metrics:
+ for metric in scope_metric.metrics:
+ print(f"\nMetric: {metric.name}")
+ for data_point in metric.data.data_points:
+ if hasattr(data_point, "bucket_counts"):
+ # Histogram
+ print(f" Count: {sum(data_point.bucket_counts)}")
+ else:
+ # Counter
+ print(f" Value: {data_point.value}")
+
+ print("\nTest completed successfully")
+
+
+def test_embedding_batch():
+ """Test batch embedding instrumentation."""
+ print("\nTest: Batch Embeddings")
+ print("=" * 60)
+
+ metric_reader = setup_telemetry()
+
+ # Configure embedding model
+ embed_model = OpenAIEmbedding(
+ model="text-embedding-3-small",
+ api_key=os.environ.get("OPENAI_API_KEY"),
+ )
+ Settings.embed_model = embed_model
+
+ # Make sure callback manager is initialized
+ if Settings.callback_manager is None:
+ Settings.callback_manager = CallbackManager()
+
+ # Generate batch embeddings
+ texts = [
+ "Paris is the capital of France",
+ "Berlin is the capital of Germany",
+ "Rome is the capital of Italy",
+ ]
+ embeddings = embed_model.get_text_embedding_batch(texts)
+
+ print(f"\nEmbedded {len(embeddings)} texts")
+ print(f"Dimension: {len(embeddings[0])}")
+
+ # Validate metrics
+ print("\nMetrics:")
+ metrics_data = metric_reader.get_metrics_data()
+ for resource_metric in metrics_data.resource_metrics:
+ for scope_metric in resource_metric.scope_metrics:
+ for metric in scope_metric.metrics:
+ print(f"\nMetric: {metric.name}")
+ for data_point in metric.data.data_points:
+ if hasattr(data_point, "bucket_counts"):
+ # Histogram
+ print(f" Count: {sum(data_point.bucket_counts)}")
+ else:
+ # Counter
+ print(f" Value: {data_point.value}")
+
+ print("\nTest completed successfully")
+
+
+if __name__ == "__main__":
+ test_embedding_single_text()
+ print("\n" + "=" * 60 + "\n")
+ test_embedding_batch()
+
+ # Cleanup
+ if instrumentor:
+ instrumentor.uninstrument()
diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_llm_instrumentation.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_llm_instrumentation.py
new file mode 100644
index 00000000..3081a15c
--- /dev/null
+++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_llm_instrumentation.py
@@ -0,0 +1,187 @@
+"""Tests for LlamaIndex LLM instrumentation with OpenTelemetry."""
+
+import os
+
+from llama_index.core.llms import ChatMessage, MessageRole
+from llama_index.core.llms.mock import MockLLM
+from opentelemetry import metrics, trace
+from opentelemetry.instrumentation.llamaindex import LlamaindexInstrumentor
+from opentelemetry.sdk.metrics import MeterProvider
+from opentelemetry.sdk.metrics.export import InMemoryMetricReader
+from opentelemetry.sdk.trace import TracerProvider
+from opentelemetry.sdk.trace.export import (
+ ConsoleSpanExporter,
+ SimpleSpanProcessor,
+)
+from opentelemetry.semconv._incubating.metrics import gen_ai_metrics
+
+
+def setup_telemetry():
+ """Setup OpenTelemetry with both trace and metrics exporters."""
+ # Setup tracing
+ trace.set_tracer_provider(TracerProvider())
+ tracer_provider = trace.get_tracer_provider()
+ tracer_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
+
+ # Setup metrics with InMemoryMetricReader
+ metric_reader = InMemoryMetricReader()
+ meter_provider = MeterProvider(metric_readers=[metric_reader])
+ metrics.set_meter_provider(meter_provider)
+
+ return tracer_provider, meter_provider, metric_reader
+
+
+def test_with_openai():
+ """Test with real OpenAI API - requires OPENAI_API_KEY environment variable."""
+ from llama_index.llms.openai import OpenAI
+
+ print("=" * 80)
+ print("Testing with OpenAI API")
+ print("=" * 80)
+
+ llm = OpenAI(model="gpt-3.5-turbo")
+ messages = [
+ ChatMessage(role=MessageRole.SYSTEM, content="You are a helpful assistant."),
+ ChatMessage(role=MessageRole.USER, content="Say hello in exactly 5 words"),
+ ]
+
+ response = llm.chat(messages)
+ print(f"\nResponse: {response.message.content}")
+
+ if hasattr(response, "raw") and response.raw:
+ if isinstance(response.raw, dict):
+ usage = response.raw.get("usage", {})
+ else:
+ usage = getattr(response.raw, "usage", None)
+
+ if usage:
+ if isinstance(usage, dict):
+ prompt_tokens = usage.get("prompt_tokens")
+ completion_tokens = usage.get("completion_tokens")
+ total_tokens = usage.get("total_tokens")
+ else:
+ prompt_tokens = getattr(usage, "prompt_tokens", None)
+ completion_tokens = getattr(usage, "completion_tokens", None)
+ total_tokens = getattr(usage, "total_tokens", None)
+
+ print(
+ f"\nToken Usage: input={prompt_tokens}, output={completion_tokens}, total={total_tokens}"
+ )
+
+ print("=" * 80)
+
+
+class MockLLMWithUsage(MockLLM):
+ """MockLLM that includes fake usage data for testing."""
+
+ def _complete(self, prompt, **kwargs):
+ """Override internal complete to inject usage data."""
+ response = super()._complete(prompt, **kwargs)
+ # Note: MockLLM uses _complete internally, but we can't easily inject
+ # usage here because the ChatResponse is created later
+ return response
+
+
+def test_with_mock():
+ """Test with MockLLM - no API key needed."""
+ print("=" * 80)
+ print("Testing with MockLLM")
+ print("=" * 80)
+
+ llm = MockLLM(max_tokens=50)
+ messages = [
+ ChatMessage(role=MessageRole.SYSTEM, content="You are a helpful assistant."),
+ ChatMessage(role=MessageRole.USER, content="Say hello in 5 words"),
+ ]
+
+ response = llm.chat(messages)
+ print(f"\nResponse: {response.message.content[:100]}...")
+ print("=" * 80)
+
+
+def test_message_extraction():
+ """Test message extraction."""
+ print("\n" + "=" * 80)
+ print("Testing message extraction")
+ print("=" * 80)
+
+ llm = MockLLM(max_tokens=20)
+ messages = [
+ ChatMessage(role=MessageRole.SYSTEM, content="You are helpful."),
+ ChatMessage(role=MessageRole.USER, content="Test message"),
+ ]
+
+ response = llm.chat(messages)
+ print(f"\nResponse: {response.message.content[:50]}...")
+ print("=" * 80)
+
+
+if __name__ == "__main__":
+ # Enable metrics emission
+ os.environ["OTEL_INSTRUMENTATION_GENAI_EMITTERS"] = "span_metric"
+
+ # Setup telemetry
+ tracer_provider, meter_provider, metric_reader = setup_telemetry()
+
+ # Instrument LlamaIndex
+ instrumentor = LlamaindexInstrumentor()
+ instrumentor.instrument(
+ tracer_provider=tracer_provider, meter_provider=meter_provider
+ )
+ print("LlamaIndex instrumentation enabled\n")
+
+ # Run tests
+ if os.environ.get("OPENAI_API_KEY"):
+ print("Testing with real OpenAI API\n")
+ test_with_openai()
+ else:
+ print("Testing with MockLLM (set OPENAI_API_KEY to test real API)\n")
+ test_with_mock()
+
+ # Test message extraction
+ test_message_extraction()
+
+ # Check metrics
+ print("\n" + "=" * 80)
+ print("Metrics Summary")
+ print("=" * 80)
+
+ metrics_data = metric_reader.get_metrics_data()
+ found_duration = False
+ found_token_usage = False
+
+ if metrics_data:
+ for rm in getattr(metrics_data, "resource_metrics", []) or []:
+ for scope in getattr(rm, "scope_metrics", []) or []:
+ for metric in getattr(scope, "metrics", []) or []:
+ print(f"\nMetric: {metric.name}")
+
+ if metric.name == gen_ai_metrics.GEN_AI_CLIENT_OPERATION_DURATION:
+ found_duration = True
+ dps = getattr(metric.data, "data_points", [])
+ if dps:
+ print(f" Duration: {dps[0].sum:.4f} seconds")
+ print(f" Count: {dps[0].count}")
+
+ if metric.name == gen_ai_metrics.GEN_AI_CLIENT_TOKEN_USAGE:
+ found_token_usage = True
+ dps = getattr(metric.data, "data_points", [])
+ for dp in dps:
+ token_type = dp.attributes.get(
+ "gen_ai.token.type", "unknown"
+ )
+ print(
+ f" Token type: {token_type}, Sum: {dp.sum}, Count: {dp.count}"
+ )
+
+ print("\n" + "=" * 80)
+ status = []
+ if found_duration:
+ status.append("Duration: OK")
+ if found_token_usage:
+ status.append("Token Usage: OK")
+ if not found_duration and not found_token_usage:
+ status.append("No metrics (use real API for metrics)")
+
+ print("Status: " + " | ".join(status))
+ print("=" * 80)
diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_workflow_agent.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_workflow_agent.py
new file mode 100644
index 00000000..e0dd0f26
--- /dev/null
+++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_workflow_agent.py
@@ -0,0 +1,180 @@
+"""
+Test Workflow-based agent instrumentation.
+
+This test validates that workflow event streaming captures agent steps and tool calls.
+"""
+
+import asyncio
+import pytest
+from typing import List
+from llama_index.core.agent import ReActAgent
+from llama_index.core import Settings
+from llama_index.core.llms import MockLLM
+from llama_index.core.base.llms.types import ChatMessage, MessageRole
+from llama_index.core.tools import FunctionTool
+from opentelemetry import trace
+from opentelemetry.instrumentation.llamaindex import LlamaindexInstrumentor
+from opentelemetry.sdk.trace import TracerProvider
+from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
+
+
+def multiply(a: int, b: int) -> int:
+ """Multiply two numbers."""
+ return a * b
+
+
+def add(a: int, b: int) -> int:
+ """Add two numbers."""
+ return a + b
+
+
+def setup_telemetry():
+ """Setup OpenTelemetry with console exporter."""
+ trace.set_tracer_provider(TracerProvider())
+ tracer_provider = trace.get_tracer_provider()
+ tracer_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
+ return tracer_provider
+
+
+class SequenceMockLLM(MockLLM):
+ responses: List[ChatMessage] = []
+ response_index: int = 0
+
+ def __init__(self, responses: List[ChatMessage], max_tokens: int = 256):
+ super().__init__(max_tokens=max_tokens)
+ self.responses = responses
+ self.response_index = 0
+
+ def chat(self, messages, **kwargs):
+ if self.response_index < len(self.responses):
+ response = self.responses[self.response_index]
+ self.response_index += 1
+ from llama_index.core.base.llms.types import ChatResponse
+
+ return ChatResponse(message=response)
+ return ChatResponse(
+ message=ChatMessage(role=MessageRole.ASSISTANT, content="Done.")
+ )
+
+ async def achat(self, messages, **kwargs):
+ if self.response_index < len(self.responses):
+ response = self.responses[self.response_index]
+ self.response_index += 1
+ from llama_index.core.base.llms.types import ChatResponse
+
+ return ChatResponse(message=response)
+ return ChatResponse(
+ message=ChatMessage(role=MessageRole.ASSISTANT, content="Done.")
+ )
+
+ def stream_chat(self, messages, **kwargs):
+ if self.response_index < len(self.responses):
+ response = self.responses[self.response_index]
+ self.response_index += 1
+ from llama_index.core.base.llms.types import ChatResponse
+
+ # Yield a single response chunk
+ yield ChatResponse(message=response, delta=response.content)
+ else:
+ yield ChatResponse(
+ message=ChatMessage(role=MessageRole.ASSISTANT, content="Done."),
+ delta="Done.",
+ )
+
+ async def astream_chat(self, messages, **kwargs):
+ async def gen():
+ if self.response_index < len(self.responses):
+ response = self.responses[self.response_index]
+ self.response_index += 1
+ from llama_index.core.base.llms.types import ChatResponse
+
+ # Yield a single response chunk
+ yield ChatResponse(message=response, delta=response.content)
+ else:
+ yield ChatResponse(
+ message=ChatMessage(role=MessageRole.ASSISTANT, content="Done."),
+ delta="Done.",
+ )
+
+ return gen()
+
+
+@pytest.mark.asyncio
+async def test_workflow_agent():
+ """Test Workflow-based agent instrumentation."""
+
+ print("=" * 80)
+ print("Setting up telemetry...")
+ print("=" * 80)
+ tracer_provider = setup_telemetry()
+
+ # Setup Mock LLM
+ mock_responses = [
+ # Step 1: Decide to multiply
+ ChatMessage(
+ role=MessageRole.ASSISTANT,
+ content="""Thought: I need to multiply 5 by 3 first.
+Action: multiply
+Action Input: {"a": 5, "b": 3}""",
+ ),
+ # Step 2: Decide to add
+ ChatMessage(
+ role=MessageRole.ASSISTANT,
+ content="""Thought: The result is 15. Now I need to add 2 to 15.
+Action: add
+Action Input: {"a": 15, "b": 2}""",
+ ),
+ # Step 3: Final Answer
+ ChatMessage(
+ role=MessageRole.ASSISTANT,
+ content="""Thought: The final result is 17.
+Answer: The result is 17.""",
+ ),
+ ]
+ Settings.llm = SequenceMockLLM(responses=mock_responses, max_tokens=256)
+
+ # Instrument
+ print("\n" + "=" * 80)
+ print("Instrumenting LlamaIndex...")
+ print("=" * 80)
+ instrumentor = LlamaindexInstrumentor()
+ instrumentor.instrument(tracer_provider=tracer_provider)
+
+ # Create tools
+ multiply_tool = FunctionTool.from_defaults(fn=multiply)
+ add_tool = FunctionTool.from_defaults(fn=add)
+
+ print("\n" + "=" * 80)
+ print("Creating Workflow-based ReActAgent...")
+ print("=" * 80)
+ agent = ReActAgent(tools=[multiply_tool, add_tool], llm=Settings.llm, verbose=True)
+
+ print("\n" + "=" * 80)
+ print("Running agent task (should see AgentInvocation -> ToolCall spans)...")
+ print("=" * 80)
+
+ handler = agent.run(user_msg="Calculate 5 times 3, then add 2 to the result")
+ result = await handler
+
+ # Give background instrumentation task time to complete
+ await asyncio.sleep(0.5)
+
+ print("\n" + "=" * 80)
+ print("RESULTS")
+ print("=" * 80)
+ print(f"Response: {result.response.content}")
+
+ print("\n" + "=" * 80)
+ print("✓ Test completed!")
+ print("=" * 80)
+ print("\nExpected trace structure:")
+ print(" AgentInvocation (gen_ai.agent.name=agent.Agent)")
+ print(" ├─ LLMInvocation")
+ print(" ├─ ToolCall (gen_ai.tool.name=multiply)")
+ print(" ├─ ToolCall (gen_ai.tool.name=add)")
+ print(" └─ LLMInvocation (final answer)")
+ print("=" * 80)
+
+
+if __name__ == "__main__":
+ asyncio.run(test_workflow_agent())