diff --git a/posthog/urls.py b/posthog/urls.py index e9a95d8a3a44..1be3e1e17a0d 100644 --- a/posthog/urls.py +++ b/posthog/urls.py @@ -47,6 +47,7 @@ from posthog.temporal.codec_server import decode_payloads from products.early_access_features.backend.api import early_access_features +from products.llm_analytics.backend.api.otel.ingestion import otel_logs_endpoint, otel_traces_endpoint from .utils import opt_slash_path, render_template from .views import ( @@ -168,6 +169,10 @@ def authorize_and_redirect(request: HttpRequest) -> HttpResponse: # ee *ee_urlpatterns, # api + # OpenTelemetry traces ingestion for LLM Analytics + path("api/projects//ai/otel/traces", csrf_exempt(otel_traces_endpoint)), + # OpenTelemetry logs ingestion for LLM Analytics + path("api/projects//ai/otel/logs", csrf_exempt(otel_logs_endpoint)), path("api/environments//progress/", progress), path("api/environments//query//progress/", progress), path("api/environments//query//progress", progress), diff --git a/products/llm_analytics/backend/api/otel/README.md b/products/llm_analytics/backend/api/otel/README.md new file mode 100644 index 000000000000..4bc04dcde726 --- /dev/null +++ b/products/llm_analytics/backend/api/otel/README.md @@ -0,0 +1,463 @@ +# OpenTelemetry AI Event Ingestion + +This module transforms OpenTelemetry traces and logs into PostHog AI events for LLM observability. + +## Overview + +The OTEL ingestion pipeline accepts OpenTelemetry Protocol (OTLP) data via HTTP endpoints and converts it into PostHog AI events that comply with the PostHog LLM Analytics schema. The system supports two instrumentation patterns (v1 and v2) that differ in how they deliver trace and log data. + +## Architecture + +```text +OTLP HTTP Request + | + v + ingestion.py (parse protobuf) + | + +---> parser.py (decode spans) + | | + | v + | transformer.py (detect v1/v2, transform spans) + | | + | +---> conventions/ (extract attributes) + | | | + | | +---> posthog_native.py + | | +---> genai.py + | | | | + | | | +---> providers/ (framework-specific transformations) + | | | | + | | | +---> mastra.py + | | | +---> base.py + | | + | +---> event_merger.py (Redis cache for v2) + | + +---> logs_parser.py (decode logs) + | + v + logs_transformer.py (transform logs) + | + v + event_merger.py (merge with traces for v2) + | + v + PostHog AI Events ($ai_generation, $ai_span, etc.) + | + v + capture_batch_internal (PostHog capture API) + | + v + Kafka (events_plugin_ingestion topic) + | + v + Plugin-server (ingestion-consumer) + | + +---> process-ai-event.ts (cost calculation, normalization) + | + v + ClickHouse (sharded_events table) +``` + +**Post-transformation flow**: + +1. **capture_batch_internal**: OTEL-transformed events enter PostHog's standard event ingestion pipeline +2. **Kafka**: Events are published to the `events_plugin_ingestion` topic for async processing +3. **Plugin-server**: The ingestion consumer processes events through `process-ai-event.ts`: + - Calculates costs based on token counts and model pricing (`$ai_input_cost_usd`, `$ai_output_cost_usd`, `$ai_total_cost_usd`) + - Normalizes trace IDs to strings + - Extracts model parameters (temperature, max_tokens, stream) from `$ai_model_parameters` +4. **ClickHouse**: Events are written to the sharded_events table for querying in PostHog UI + +## Instrumentation Patterns + +### v1 Instrumentation + +**Data Model**: All data in span attributes + +v1 instrumentation sends complete LLM call data within span attributes using indexed fields: + +- Prompts: `gen_ai.prompt.0.role`, `gen_ai.prompt.0.content` +- Completions: `gen_ai.completion.0.role`, `gen_ai.completion.0.content` +- Metadata: `gen_ai.request.model`, `gen_ai.usage.input_tokens`, etc. + +**Processing**: The transformer recognizes v1 in two ways: + +1. Span contains `prompt` or `completion` attributes (after extraction) +2. Framework detection via instrumentation scope name (e.g., `@mastra/otel` for Mastra) + +When detected, events are sent immediately without caching since v1 spans are self-contained. + +**Packages**: `opentelemetry-instrumentation-openai`, Mastra framework (`@mastra/otel-exporter`) + +### v2 Instrumentation + +**Data Model**: Separated metadata and content + +v2 instrumentation splits LLM call data across two channels: + +- Traces: Model name, token counts, timing, span structure +- Logs: Message content (user prompts, assistant completions, tool calls) + +**Processing**: The event merger provides bidirectional merging - either traces or logs can arrive first. The first arrival caches data in Redis, the second arrival merges and returns a complete event. Multiple log events for the same span are accumulated atomically before merging to ensure completeness. + +**Package**: `opentelemetry-instrumentation-openai-v2` + +**Design Rationale**: Separating traces and logs allows v2 to stream content while maintaining trace context, but requires a merge layer to recombine data into complete events. + +## Components + +### ingestion.py + +Main entry point for OTLP HTTP requests. Parses protobuf payloads and routes to appropriate transformers. For v2 logs, groups all log records by (trace_id, span_id) before processing to ensure atomic accumulation of multi-log spans. + +### transformer.py + +Converts OTel spans to PostHog AI events using a waterfall attribute extraction pattern: + +1. Extract PostHog-native attributes (highest priority) +2. Extract GenAI semantic convention attributes (fallback, with provider transformers) +3. Merge with PostHog attributes taking precedence + +Determines event type based on span characteristics: + +- `$ai_generation`: LLM completion requests (has model, tokens, and input) +- `$ai_embedding`: Embedding requests (operation_name matches embedding patterns) +- `$ai_trace`: Root spans (no parent) for v2 frameworks +- `$ai_span`: All other spans, including root spans from v1 frameworks + +**Pattern Detection**: Uses `OtelInstrumentationPattern` enum to determine routing: + +1. Provider declares pattern via `get_instrumentation_pattern()` (most reliable) +2. Span has `prompt` or `completion` attributes (indicates V1 data present) +3. Span is an embedding operation (embeddings don't have associated logs) +4. Default to V2 (safer - waits for logs rather than sending incomplete) + +V1 spans bypass the event merger and are sent immediately. + +**Event Type Logic**: For V1 frameworks, root spans are marked as `$ai_span` (not `$ai_trace`) to ensure they appear in the tree hierarchy. This is necessary because `TraceQueryRunner` filters out `$ai_trace` events from the events array. + +### logs_transformer.py + +Converts OTel log records to AI event properties. Extracts message content from log body and metadata from log attributes. Designed to work with event_merger for v2 ingestion. + +### event_merger.py + +Redis-based non-blocking cache for v2 trace/log coordination. Uses simple Redis operations (get/setex/delete) for fast caching and retrieval. Keys expire after 60 seconds to prevent orphaned entries. + +**Merge Logic**: + +- First arrival: Cache data, return None (event not ready) +- Second arrival: Retrieve cached data, merge properties, delete cache, return complete event + +**Key Pattern**: `otel_merge:{type}:{trace_id}:{span_id}` + +### parser.py / logs_parser.py + +Decode OTLP protobuf messages into Python dictionaries. Handle type conversions (bytes to hex for IDs, nanoseconds to seconds for timestamps) and attribute flattening. + +**Return Format**: Both parsers return a list of items where each item contains its own resource and scope context: + +- `parse_otlp_request()`: Returns `[{"span": {...}, "resource": {...}, "scope": {...}}, ...]` +- `parse_otlp_logs_request()`: Returns `[{"log": {...}, "resource": {...}, "scope": {...}}, ...]` + +This per-item format ensures correct resource/scope attribution when a single OTLP request contains multiple `resource_spans`/`resource_logs` (e.g., from different services or scopes). + +### conventions/ + +Attribute extraction modules implementing semantic conventions: + +**posthog_native.py**: Extracts PostHog-specific attributes prefixed with `posthog.ai.*`. These take precedence in the waterfall. + +**genai.py**: Extracts OpenTelemetry GenAI semantic convention attributes (`gen_ai.*`). Handles indexed message fields by collecting attributes like `gen_ai.prompt.0.role` into structured message arrays. Provides `detect_provider()` function for centralized provider detection. Supports provider-specific transformations for frameworks that use custom OTEL formats. + +**providers/**: Framework-specific transformers for handling custom OTEL formats: + +- **base.py**: Abstract base class defining the provider transformer interface: + - `can_handle()`: Detect if transformer handles this span + - `transform_prompt()`: Transform provider-specific prompt format + - `transform_completion()`: Transform provider-specific completion format + - `get_instrumentation_pattern()`: Declare V1 or V2 pattern (returns `OtelInstrumentationPattern` enum) +- **mastra.py**: Transforms Mastra's wrapped message format (e.g., `{"messages": [...]}` for input, `{"text": "...", "files": [], ...}` for output) into standard PostHog format. Detected by instrumentation scope name `@mastra/otel`. Declares `V1_ATTRIBUTES` pattern. + +## Event Schema + +All events conform to the PostHog LLM Analytics schema: + +**Core Properties**: + +- `$ai_input`: Array of message objects `[{role: str, content: str}]` +- `$ai_output_choices`: Array of completion objects `[{role: str, content: str}]` +- `$ai_model`: Model identifier (e.g., "gpt-4o-mini") +- `$ai_provider`: Provider name (e.g., "openai") +- `$ai_input_tokens`: Input token count +- `$ai_output_tokens`: Output token count + +**Trace Context**: + +- `$ai_trace_id`, `$ai_span_id`, `$ai_parent_id` +- `$ai_session_id`, `$ai_generation_id` + +**Metrics**: + +- `$ai_latency`: Duration in seconds +- `$ai_total_cost_usd`, `$ai_input_cost_usd`, `$ai_output_cost_usd` + +**Configuration**: + +- `$ai_temperature`, `$ai_max_tokens`, `$ai_stream` +- `$ai_tools`: Array of tool definitions + +**Error Tracking**: + +- `$ai_is_error`: Boolean +- `$ai_error`: Error message string + +## API Endpoints + +**Traces**: `POST /api/projects/{project_id}/ai/otel/traces` + +- Content-Type: `application/x-protobuf` +- Authorization: `Bearer {project_api_key}` +- Accepts OTLP trace payloads + +**Logs**: `POST /api/projects/{project_id}/ai/otel/logs` + +- Content-Type: `application/x-protobuf` +- Authorization: `Bearer {project_api_key}` +- Accepts OTLP log payloads + +## Testing + +Run unit tests: + +```bash +pytest products/llm_analytics/backend/api/otel/ +``` + +Integration testing requires: + +1. Running PostHog instance with OTEL endpoints enabled +2. OpenTelemetry SDK configured to send to local endpoints +3. Redis instance for event merger cache + +Example v2 test configuration: + +```python +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter + +trace_exporter = OTLPSpanExporter( + endpoint=f"{posthog_host}/api/projects/{project_id}/ai/otel/traces", + headers={"Authorization": f"Bearer {api_key}"} +) + +log_exporter = OTLPLogExporter( + endpoint=f"{posthog_host}/api/projects/{project_id}/ai/otel/logs", + headers={"Authorization": f"Bearer {api_key}"} +) +``` + +## Design Decisions + +### Waterfall Attribute Extraction + +PostHog-native attributes take precedence over GenAI conventions, allowing instrumentation to override standard attributes when needed. This provides flexibility for custom instrumentation while maintaining compatibility with standard OTEL instrumentation. + +### Non-Blocking Event Merger + +The merger returns None on first arrival rather than blocking. This prevents the ingestion pipeline from waiting on Redis and keeps request processing fast. The tradeoff is that v2 requires two round-trips (trace + logs) before emitting events. + +### Atomic Log Accumulation + +v2 can send multiple log events in a single HTTP request. The ingestion layer groups these by (trace_id, span_id) and accumulates their properties before calling the merger. This prevents race conditions where partial log data gets merged before all logs arrive. + +### Pattern Detection via Provider Transformers + +Rather than hardcoding framework names, the transformer uses a layered detection approach: + +1. **Provider declaration** (most reliable): Providers implement `get_instrumentation_pattern()` returning `OtelInstrumentationPattern.V1_ATTRIBUTES` or `V2_TRACES_AND_LOGS` +2. **Content detection** (fallback): Span has `prompt` or `completion` attributes after extraction +3. **Safe default**: Unknown providers default to V2 (waits for logs rather than sending incomplete events) + +This allows both patterns to coexist without configuration, and new providers only need to declare their pattern in one place. + +### Provider Transformers + +Some frameworks (like Mastra) wrap OTEL data in custom structures that don't match standard GenAI conventions. Provider transformers detect these frameworks (via instrumentation scope or attribute prefixes) and unwrap their data into standard format. This keeps framework-specific logic isolated while maintaining compatibility with the core transformer pipeline. + +**Example**: Mastra wraps prompts as `{"messages": [{"role": "user", "content": [...]}]}` where content is an array of `{"type": "text", "text": "..."}` objects. The Mastra transformer unwraps this into standard `[{"role": "user", "content": "..."}]` format. + +### Event Type Determination for V1 Frameworks + +V1 frameworks create root spans that should appear in the tree hierarchy alongside their children. The `determine_event_type()` function checks `provider.get_instrumentation_pattern()` and marks V1 root spans as `$ai_span` (not `$ai_trace`) because `TraceQueryRunner` filters out `$ai_trace` events from the events array. This ensures V1 framework traces display correctly with proper parent-child relationships in the UI. + +### TTL-Based Cleanup + +The event merger uses 60-second TTL on cache entries. This automatically cleans up orphaned data from incomplete traces (e.g., lost log packets) without requiring background jobs or manual cleanup. + +## Extending the System + +### Adding New Provider Transformers + +Create a new transformer in `conventions/providers/`: + +```python +from .base import OtelInstrumentationPattern, ProviderTransformer +from typing import Any +import json + +class CustomFrameworkTransformer(ProviderTransformer): + """Transform CustomFramework's OTEL format.""" + + def can_handle(self, span: dict[str, Any], scope: dict[str, Any]) -> bool: + """Detect CustomFramework by scope name or attributes.""" + scope_name = scope.get("name", "") + return scope_name == "custom-framework-scope" + + def get_instrumentation_pattern(self) -> OtelInstrumentationPattern: + """Declare V1 or V2 pattern - determines event routing.""" + # V1: All data in span attributes, send immediately + # V2: Metadata in spans, content in logs, requires merge + return OtelInstrumentationPattern.V1_ATTRIBUTES + + def transform_prompt(self, prompt: Any) -> Any: + """Transform wrapped prompt format to standard.""" + if not isinstance(prompt, str): + return None + + try: + parsed = json.loads(prompt) + # Transform custom format to standard + return [{"role": "user", "content": parsed["text"]}] + except (json.JSONDecodeError, KeyError): + return None + + def transform_completion(self, completion: Any) -> Any: + """Transform wrapped completion format to standard.""" + # Similar transformation logic + pass +``` + +Register in `conventions/providers/__init__.py`: + +```python +from .custom_framework import CustomFrameworkTransformer + +PROVIDER_TRANSFORMERS = [ + CustomFrameworkTransformer, + MastraTransformer, +] +``` + +### Adding New Semantic Conventions + +Create a new extractor in `conventions/`: + +```python +def extract_custom_attributes(span: dict[str, Any]) -> dict[str, Any]: + attributes = span.get("attributes", {}) + result = {} + + # Extract custom attributes + if custom_attr := attributes.get("custom.attribute"): + result["custom_field"] = custom_attr + + return result +``` + +Add to waterfall in `transformer.py`: + +```python +custom_attrs = extract_custom_attributes(span) +merged_attrs = {**genai_attrs, **posthog_attrs, **custom_attrs} +``` + +### Supporting New Event Types + +Add logic to `determine_event_type()` in `transformer.py`: + +```python +def determine_event_type(span: dict[str, Any], attrs: dict[str, Any]) -> str: + op_name = attrs.get("operation_name", "").lower() + + if op_name == "new_operation": + return "$ai_new_event_type" + # ... existing logic +``` + +### Custom Property Mapping + +Extend `build_event_properties()` in `transformer.py` to map additional attributes to event properties. + +## Performance Characteristics + +- **Throughput**: Limited by Redis round-trip time for v2 merging +- **Latency**: v1 has single-pass latency, v2 has cache lookup latency +- **Memory**: Redis cache bounded by TTL (60s max retention) +- **Concurrency**: Simple Redis operations enable fast merging with minimal race condition risk + +## Provider Reference + +Different LLM frameworks implement OTEL instrumentation with their own nuances. This section documents known provider behaviors to help understand what to expect from each. + +### Mastra (`@mastra/otel`) + +**Detection**: Instrumentation scope name `@mastra/otel` or `mastra.*` attribute prefix + +**OTEL Pattern**: `V1_ATTRIBUTES` (all data in span attributes) + +**Key Behaviors**: + +- **No conversation history accumulation**: Each `agent.generate()` call creates a separate, independent trace. The `gen_ai.prompt` only contains that specific call's input (typically system message + current user message), not the accumulated conversation history from previous turns. +- **Wrapped message format**: Prompts are JSON-wrapped as `{"messages": [{"role": "user", "content": [{"type": "text", "text": "..."}]}]}` where content is an array of typed objects. +- **Wrapped completion format**: Completions are JSON-wrapped as `{"text": "...", "files": [], "warnings": [], ...}`. +- **Multi-turn traces**: In a multi-turn conversation, you'll see multiple separate traces (one per `agent.generate()` call), each showing only that turn's input/output. + +**Implications for PostHog**: + +- Each turn appears as a separate trace in LLM Analytics +- To see full conversation context, users need to look at the sequence of traces +- The Mastra transformer unwraps the custom JSON format into standard PostHog message arrays + +**Example**: A 4-turn conversation produces 4 traces, where turn 4's input only shows "Thanks, bye!" (not the previous greeting, weather query, and joke request). + +### OpenTelemetry Instrumentation OpenAI v1 (`opentelemetry-instrumentation-openai`) + +**Detection**: Span attributes with indexed prompt/completion fields (no custom provider transformer needed - uses standard GenAI conventions) + +**OTEL Pattern**: `V1_ATTRIBUTES` (all data in span attributes) + +**Key Behaviors**: + +- **Full conversation in each call**: The `gen_ai.prompt.*` attributes contain all messages passed to the API call +- **Indexed attributes**: Messages use `gen_ai.prompt.0.role`, `gen_ai.prompt.0.content`, etc. +- **Direct attribute format**: No JSON wrapping, values are stored directly as span attributes + +**Implications for PostHog**: + +- If the application maintains conversation state, later turns show full history +- Each trace is self-contained with complete context + +### OpenTelemetry Instrumentation OpenAI v2 (`opentelemetry-instrumentation-openai-v2`) + +**Detection**: Spans without prompt/completion attributes, accompanied by OTEL log events (no custom provider transformer needed - detected by absence of V1 content) + +**OTEL Pattern**: `V2_TRACES_AND_LOGS` (traces + logs separated) + +**Key Behaviors**: + +- **Split data model**: Traces contain metadata (model, tokens, timing), logs contain message content +- **Log events**: Uses `gen_ai.user.message`, `gen_ai.assistant.message`, `gen_ai.tool.message`, etc. +- **Full conversation in each call**: Like v1, if the app maintains state, messages accumulate +- **Requires merge**: PostHog's event merger combines traces and logs into complete events + +**Implications for PostHog**: + +- Slightly higher latency due to merge process +- Supports streaming better than v1 +- Both traces and logs endpoints must be configured + +## References + +- [OpenTelemetry GenAI Semantic Conventions](https://opentelemetry.io/docs/specs/semconv/gen-ai/) +- [OTLP Specification](https://opentelemetry.io/docs/specs/otlp/) +- [PostHog AI Engineering Documentation](https://posthog.com/docs/ai-engineering) diff --git a/products/llm_analytics/backend/api/otel/__init__.py b/products/llm_analytics/backend/api/otel/__init__.py new file mode 100644 index 000000000000..1cffc22cfd30 --- /dev/null +++ b/products/llm_analytics/backend/api/otel/__init__.py @@ -0,0 +1 @@ +# OpenTelemetry traces ingestion for PostHog LLM Analytics diff --git a/products/llm_analytics/backend/api/otel/conventions/__init__.py b/products/llm_analytics/backend/api/otel/conventions/__init__.py new file mode 100644 index 000000000000..06235538a984 --- /dev/null +++ b/products/llm_analytics/backend/api/otel/conventions/__init__.py @@ -0,0 +1,7 @@ +""" +OpenTelemetry semantic conventions for LLM traces. + +Supports: +- PostHog native: posthog.ai.* attributes (highest priority) +- GenAI: gen_ai.* attributes (fallback) +""" diff --git a/products/llm_analytics/backend/api/otel/conventions/genai.py b/products/llm_analytics/backend/api/otel/conventions/genai.py new file mode 100644 index 000000000000..83e7f74c08e9 --- /dev/null +++ b/products/llm_analytics/backend/api/otel/conventions/genai.py @@ -0,0 +1,187 @@ +""" +GenAI semantic conventions for OpenTelemetry. + +Implements the GenAI semantic conventions (gen_ai.*) as fallback +when PostHog-native attributes are not present. + +Supports provider-specific transformations for frameworks like Mastra +that use custom OTEL formats. + +Reference: https://opentelemetry.io/docs/specs/semconv/gen-ai/ +""" + +from collections import defaultdict +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from .providers.base import ProviderTransformer + + +def has_genai_attributes(span: dict[str, Any]) -> bool: + """Check if span uses GenAI semantic conventions.""" + attributes = span.get("attributes", {}) + return any(key.startswith("gen_ai.") for key in attributes.keys()) + + +def _extract_indexed_messages(attributes: dict[str, Any], prefix: str) -> list[dict[str, Any]] | None: + """ + Extract indexed message attributes like gen_ai.prompt.{N}.{field} into a list of message dicts. + + Args: + attributes: Span attributes dictionary + prefix: Message prefix (e.g., "gen_ai.prompt" or "gen_ai.completion") + + Returns: + List of message dicts with role, content, etc., or None if no messages found + """ + # Group attributes by index + messages_by_index: dict[int, dict[str, Any]] = defaultdict(dict) + + for key, value in attributes.items(): + if not key.startswith(f"{prefix}."): + continue + + # Parse: gen_ai.prompt.0.role -> index=0, field=role + parts = key[len(prefix) + 1 :].split(".", 1) + if len(parts) != 2: + continue + + try: + index = int(parts[0]) + field = parts[1] + messages_by_index[index][field] = value + except (ValueError, IndexError): + continue + + if not messages_by_index: + return None + + # Convert to sorted list of messages + messages = [] + for index in sorted(messages_by_index.keys()): + msg = messages_by_index[index] + if msg: # Only include non-empty messages + messages.append(msg) + + return messages if messages else None + + +def detect_provider(span: dict[str, Any], scope: dict[str, Any] | None = None) -> "ProviderTransformer | None": + """ + Detect which provider transformer handles this span. + + Args: + span: Parsed OTEL span + scope: Instrumentation scope info + + Returns: + Matching ProviderTransformer instance, or None if no provider matches + """ + from .providers import PROVIDER_TRANSFORMERS + + scope = scope or {} + for transformer_class in PROVIDER_TRANSFORMERS: + transformer = transformer_class() + if transformer.can_handle(span, scope): + return transformer + return None + + +def extract_genai_attributes(span: dict[str, Any], scope: dict[str, Any] | None = None) -> dict[str, Any]: + """ + Extract GenAI semantic convention attributes from span. + + GenAI conventions use `gen_ai.*` prefix and are fallback + when PostHog-native attributes are not present. + + Supports provider-specific transformations for frameworks that use + custom OTEL formats (e.g., Mastra). + + Args: + span: Parsed OTEL span + scope: Instrumentation scope info (for provider detection) + + Returns: + Extracted attributes dict + """ + attributes = span.get("attributes", {}) + scope = scope or {} + result: dict[str, Any] = {} + + # Detect provider-specific transformer + provider_transformer = detect_provider(span, scope) + + # Model (prefer request, fallback to response, then system) + model = ( + attributes.get("gen_ai.request.model") + or attributes.get("gen_ai.response.model") + or attributes.get("gen_ai.model") + ) + if model is not None: + result["model"] = model + + # Provider (from gen_ai.system) + if (system := attributes.get("gen_ai.system")) is not None: + result["provider"] = system + + # Operation name + if (operation_name := attributes.get("gen_ai.operation.name")) is not None: + result["operation_name"] = operation_name + + # Token usage + if (input_tokens := attributes.get("gen_ai.usage.input_tokens")) is not None: + result["input_tokens"] = input_tokens + if (output_tokens := attributes.get("gen_ai.usage.output_tokens")) is not None: + result["output_tokens"] = output_tokens + + # Content (prompt and completion) + # Try indexed messages first (gen_ai.prompt.0.role, gen_ai.prompt.0.content, etc.) + prompts = _extract_indexed_messages(attributes, "gen_ai.prompt") + if prompts: + result["prompt"] = prompts + # Fallback to direct gen_ai.prompt attribute + elif (prompt := attributes.get("gen_ai.prompt")) is not None: + # Try provider-specific transformation + if provider_transformer: + transformed = provider_transformer.transform_prompt(prompt) + if transformed is not None: + result["prompt"] = transformed + else: + result["prompt"] = prompt + else: + result["prompt"] = prompt + + completions = _extract_indexed_messages(attributes, "gen_ai.completion") + if completions: + result["completion"] = completions + # Fallback to direct gen_ai.completion attribute + elif (completion := attributes.get("gen_ai.completion")) is not None: + # Try provider-specific transformation + if provider_transformer: + transformed = provider_transformer.transform_completion(completion) + if transformed is not None: + result["completion"] = transformed + else: + result["completion"] = completion + else: + result["completion"] = completion + + # Model parameters + if (temperature := attributes.get("gen_ai.request.temperature")) is not None: + result["temperature"] = temperature + if (max_tokens := attributes.get("gen_ai.request.max_tokens")) is not None: + result["max_tokens"] = max_tokens + if (top_p := attributes.get("gen_ai.request.top_p")) is not None: + result["top_p"] = top_p + if (frequency_penalty := attributes.get("gen_ai.request.frequency_penalty")) is not None: + result["frequency_penalty"] = frequency_penalty + if (presence_penalty := attributes.get("gen_ai.request.presence_penalty")) is not None: + result["presence_penalty"] = presence_penalty + + # Response metadata + if (finish_reasons := attributes.get("gen_ai.response.finish_reasons")) is not None: + result["finish_reasons"] = finish_reasons + if (response_id := attributes.get("gen_ai.response.id")) is not None: + result["response_id"] = response_id + + return result diff --git a/products/llm_analytics/backend/api/otel/conventions/posthog_native.py b/products/llm_analytics/backend/api/otel/conventions/posthog_native.py new file mode 100644 index 000000000000..d8fe30eb0881 --- /dev/null +++ b/products/llm_analytics/backend/api/otel/conventions/posthog_native.py @@ -0,0 +1,88 @@ +""" +PostHog-native OpenTelemetry conventions. + +Attributes with `posthog.ai.*` prefix have highest priority in the waterfall. +""" + +from typing import Any + + +def has_posthog_attributes(span: dict[str, Any]) -> bool: + """Check if span uses PostHog native conventions.""" + attributes = span.get("attributes", {}) + return any(key.startswith("posthog.ai.") for key in attributes.keys()) + + +def extract_posthog_native_attributes(span: dict[str, Any]) -> dict[str, Any]: + """ + Extract PostHog-native attributes from span. + + PostHog-native convention uses `posthog.ai.*` prefix. + This takes highest priority in the waterfall pattern. + """ + attributes = span.get("attributes", {}) + result: dict[str, Any] = {} + + # Helper to get attribute with prefix + def get_attr(key: str) -> Any: + return attributes.get(f"posthog.ai.{key}") + + # Core identifiers + if (model := get_attr("model")) is not None: + result["model"] = model + if (provider := get_attr("provider")) is not None: + result["provider"] = provider + if (trace_id := get_attr("trace_id")) is not None: + result["trace_id"] = trace_id + if (span_id := get_attr("span_id")) is not None: + result["span_id"] = span_id + if (parent_id := get_attr("parent_id")) is not None: + result["parent_id"] = parent_id + if (session_id := get_attr("session_id")) is not None: + result["session_id"] = session_id + if (generation_id := get_attr("generation_id")) is not None: + result["generation_id"] = generation_id + + # Token usage + if (input_tokens := get_attr("input_tokens")) is not None: + result["input_tokens"] = input_tokens + if (output_tokens := get_attr("output_tokens")) is not None: + result["output_tokens"] = output_tokens + if (cache_read_tokens := get_attr("cache_read_tokens")) is not None: + result["cache_read_tokens"] = cache_read_tokens + if (cache_write_tokens := get_attr("cache_write_tokens")) is not None: + result["cache_write_tokens"] = cache_write_tokens + + # Cost + if (input_cost_usd := get_attr("input_cost_usd")) is not None: + result["input_cost_usd"] = input_cost_usd + if (output_cost_usd := get_attr("output_cost_usd")) is not None: + result["output_cost_usd"] = output_cost_usd + if (total_cost_usd := get_attr("total_cost_usd")) is not None: + result["total_cost_usd"] = total_cost_usd + + # Operation + if (operation_name := get_attr("operation_name")) is not None: + result["operation_name"] = operation_name + + # Content + if (input_content := get_attr("input")) is not None: + result["input"] = input_content + if (output_content := get_attr("output")) is not None: + result["output"] = output_content + + # Model parameters + if (temperature := get_attr("temperature")) is not None: + result["temperature"] = temperature + if (max_tokens := get_attr("max_tokens")) is not None: + result["max_tokens"] = max_tokens + if (stream := get_attr("stream")) is not None: + result["stream"] = stream + + # Error tracking + if (is_error := get_attr("is_error")) is not None: + result["is_error"] = is_error + if (error_message := get_attr("error_message")) is not None: + result["error_message"] = error_message + + return result diff --git a/products/llm_analytics/backend/api/otel/conventions/providers/__init__.py b/products/llm_analytics/backend/api/otel/conventions/providers/__init__.py new file mode 100644 index 000000000000..24eacf7aad3a --- /dev/null +++ b/products/llm_analytics/backend/api/otel/conventions/providers/__init__.py @@ -0,0 +1,23 @@ +""" +Provider-specific OTEL transformers. + +Each provider (Mastra, Langchain, LlamaIndex, etc.) handles their +specific OTEL format quirks and normalizes to PostHog format. +""" + +from .base import OtelInstrumentationPattern, ProviderTransformer +from .mastra import MastraTransformer + +# Registry of all available provider transformers +# Add new providers here as they're implemented +PROVIDER_TRANSFORMERS: list[type[ProviderTransformer]] = [ + MastraTransformer, + # Future: LangchainTransformer, LlamaIndexTransformer, etc. +] + +__all__ = [ + "OtelInstrumentationPattern", + "ProviderTransformer", + "MastraTransformer", + "PROVIDER_TRANSFORMERS", +] diff --git a/products/llm_analytics/backend/api/otel/conventions/providers/base.py b/products/llm_analytics/backend/api/otel/conventions/providers/base.py new file mode 100644 index 000000000000..982718495822 --- /dev/null +++ b/products/llm_analytics/backend/api/otel/conventions/providers/base.py @@ -0,0 +1,106 @@ +""" +Base provider transformer interface. + +Provider transformers handle framework/library-specific OTEL formats +and normalize them to PostHog's standard format. + +When adding a new provider transformer, document these aspects: +1. Detection method (scope name, attribute prefix, etc.) +2. OTEL pattern (v1 attributes-only vs v2 traces+logs) +3. Message format quirks (JSON wrapping, content arrays, etc.) +4. Conversation history behavior (accumulated vs per-call) +5. Any other notable behaviors + +See mastra.py for an example of well-documented provider behavior. +""" + +from abc import ABC, abstractmethod +from enum import Enum +from typing import Any + + +class OtelInstrumentationPattern(Enum): + """ + OTEL instrumentation patterns for LLM frameworks. + + V1_ATTRIBUTES: All data (metadata + content) in span attributes + - Send events immediately, no waiting for logs + - Example: opentelemetry-instrumentation-openai, Mastra + + V2_TRACES_AND_LOGS: Metadata in spans, content in separate log events + - Requires event merger to combine traces + logs + - Example: opentelemetry-instrumentation-openai-v2 + """ + + V1_ATTRIBUTES = "v1_attributes" + V2_TRACES_AND_LOGS = "v2_traces_and_logs" + + +class ProviderTransformer(ABC): + """ + Base class for provider-specific OTEL transformers. + + Each provider (Mastra, Langchain, LlamaIndex, etc.) can implement + a transformer to handle their specific OTEL format quirks. + """ + + @abstractmethod + def can_handle(self, span: dict[str, Any], scope: dict[str, Any]) -> bool: + """ + Detect if this transformer can handle the given span. + + Args: + span: Parsed OTEL span + scope: Instrumentation scope info + + Returns: + True if this transformer recognizes and can handle this span + """ + pass + + @abstractmethod + def transform_prompt(self, prompt: Any) -> Any: + """ + Transform provider-specific prompt format to standard format. + + Args: + prompt: Raw prompt value from gen_ai.prompt attribute + + Returns: + Normalized prompt (list of message dicts, string, or None if no transformation needed) + """ + pass + + @abstractmethod + def transform_completion(self, completion: Any) -> Any: + """ + Transform provider-specific completion format to standard format. + + Args: + completion: Raw completion value from gen_ai.completion attribute + + Returns: + Normalized completion (list of message dicts, string, or None if no transformation needed) + """ + pass + + def get_provider_name(self) -> str: + """ + Get the provider name for logging/debugging. + + Returns: + Human-readable provider name + """ + return self.__class__.__name__.replace("Transformer", "") + + def get_instrumentation_pattern(self) -> OtelInstrumentationPattern: + """ + Get the OTEL instrumentation pattern this provider uses. + + Override in subclass to declare the pattern. Default is V2_TRACES_AND_LOGS + for safety - better to wait for logs than to send incomplete events. + + Returns: + The instrumentation pattern enum value + """ + return OtelInstrumentationPattern.V2_TRACES_AND_LOGS diff --git a/products/llm_analytics/backend/api/otel/conventions/providers/mastra.py b/products/llm_analytics/backend/api/otel/conventions/providers/mastra.py new file mode 100644 index 000000000000..f5436ccb121d --- /dev/null +++ b/products/llm_analytics/backend/api/otel/conventions/providers/mastra.py @@ -0,0 +1,129 @@ +""" +Mastra provider transformer. + +Handles Mastra's OTEL format which wraps messages in custom structures: +- Input: {"messages": [{"role": "user", "content": [...]}]} +- Output: {"files": [], "text": "...", "warnings": [], ...} + +Provider Behavior Notes: +------------------------ +Mastra uses the @mastra/otel instrumentation scope and sends OTEL data in v1 pattern +(all data in span attributes, no separate log events). + +Key characteristic: Mastra does NOT accumulate conversation history across calls. +Each `agent.generate()` call creates a separate, independent trace containing only +that turn's input (system message + current user message) and output. This means: + +- A 4-turn conversation produces 4 separate traces +- Turn 4's trace only shows "Thanks, bye!" as input, not previous turns +- To see full conversation context, users must look at the sequence of traces + +This is expected Mastra behavior, not a limitation of our ingestion. The framework +treats each generate() call as an independent operation. +""" + +import json +from typing import Any + +from .base import OtelInstrumentationPattern, ProviderTransformer + + +class MastraTransformer(ProviderTransformer): + """ + Transform Mastra's OTEL format to PostHog standard format. + + Mastra uses @mastra/otel instrumentation scope and wraps messages + in custom structures that need unwrapping. + """ + + def can_handle(self, span: dict[str, Any], scope: dict[str, Any]) -> bool: + """ + Detect Mastra by instrumentation scope name. + + Mastra sets scope.name to "@mastra/otel" in its span converter. + """ + scope_name = scope.get("name", "") + + # Primary detection: instrumentation scope + if scope_name == "@mastra/otel": + return True + + # Fallback: check for mastra-prefixed attributes + attributes = span.get("attributes", {}) + return any(key.startswith("mastra.") for key in attributes.keys()) + + def get_instrumentation_pattern(self) -> OtelInstrumentationPattern: + """Mastra uses v1 pattern - all data in span attributes.""" + return OtelInstrumentationPattern.V1_ATTRIBUTES + + def transform_prompt(self, prompt: Any) -> Any: + """ + Transform Mastra's wrapped input format. + + Mastra wraps messages as: {"messages": [{"role": "user", "content": [...]}]} + where content can be an array of objects like [{"type": "text", "text": "..."}] + """ + if not isinstance(prompt, str): + return None # No transformation needed + + try: + parsed = json.loads(prompt) + + # Check for Mastra input format: {"messages": [...]} + if not isinstance(parsed, dict) or "messages" not in parsed: + return None # Not Mastra format + + messages = parsed["messages"] + if not isinstance(messages, list): + return None + + # Transform Mastra messages to standard format + result = [] + for msg in messages: + if not isinstance(msg, dict) or "role" not in msg: + continue + + # Handle Mastra's content array format: [{"type": "text", "text": "..."}] + if "content" in msg and isinstance(msg["content"], list): + text_parts = [] + for content_item in msg["content"]: + if isinstance(content_item, dict): + if content_item.get("type") == "text" and "text" in content_item: + text_parts.append(content_item["text"]) + + if text_parts: + result.append({"role": msg["role"], "content": " ".join(text_parts)}) + else: + # Keep as-is if we can't extract text + result.append(msg) + else: + # Standard format message + result.append(msg) + + return result if result else None + + except (json.JSONDecodeError, TypeError, KeyError): + return None + + def transform_completion(self, completion: Any) -> Any: + """ + Transform Mastra's wrapped output format. + + Mastra wraps output as: {"files": [], "text": "...", "warnings": [], ...} + Extract just the text content. + """ + if not isinstance(completion, str): + return None # No transformation needed + + try: + parsed = json.loads(completion) + + # Check for Mastra output format: {"text": "...", ...} + if not isinstance(parsed, dict) or "text" not in parsed: + return None # Not Mastra format + + # Extract text content as assistant message + return [{"role": "assistant", "content": parsed["text"]}] + + except (json.JSONDecodeError, TypeError, KeyError): + return None diff --git a/products/llm_analytics/backend/api/otel/conventions/providers/test_mastra.py b/products/llm_analytics/backend/api/otel/conventions/providers/test_mastra.py new file mode 100644 index 000000000000..b6da88cb4be2 --- /dev/null +++ b/products/llm_analytics/backend/api/otel/conventions/providers/test_mastra.py @@ -0,0 +1,166 @@ +""" +Tests for Mastra provider transformer. +""" + +import json + +import pytest + +from products.llm_analytics.backend.api.otel.conventions.providers.mastra import MastraTransformer + + +class TestMastraTransformer: + """Test Mastra-specific format transformations.""" + + def setup_method(self): + """Setup test instance.""" + self.transformer = MastraTransformer() + + def test_can_handle_mastra_scope(self): + """Test detection by @mastra/otel scope name.""" + span = {"attributes": {}} + scope = {"name": "@mastra/otel", "version": "1.0.0"} + + assert self.transformer.can_handle(span, scope) is True + + def test_can_handle_mastra_attributes(self): + """Test detection by mastra.* attributes.""" + span = { + "attributes": { + "mastra.trace_id": "abc123", + "mastra.span_id": "def456", + } + } + scope = {"name": "other"} + + assert self.transformer.can_handle(span, scope) is True + + def test_cannot_handle_non_mastra(self): + """Test that non-Mastra spans are not handled.""" + span = {"attributes": {"gen_ai.system": "openai"}} + scope = {"name": "opentelemetry-instrumentation-openai"} + + assert self.transformer.can_handle(span, scope) is False + + def test_transform_prompt_simple_messages(self): + """Test transforming Mastra's wrapped messages format.""" + mastra_input = json.dumps( + { + "messages": [ + {"role": "system", "content": "You are a helpful assistant"}, + {"role": "user", "content": "Hello"}, + ] + } + ) + + result = self.transformer.transform_prompt(mastra_input) + + assert result is not None + assert len(result) == 2 + assert result[0] == {"role": "system", "content": "You are a helpful assistant"} + assert result[1] == {"role": "user", "content": "Hello"} + + def test_transform_prompt_with_content_array(self): + """Test transforming Mastra's content array format.""" + mastra_input = json.dumps( + { + "messages": [ + { + "role": "user", + "content": [ + {"type": "text", "text": "What's the weather"}, + {"type": "text", "text": " in Paris?"}, + ], + } + ] + } + ) + + result = self.transformer.transform_prompt(mastra_input) + + assert result is not None + assert len(result) == 1 + assert result[0]["role"] == "user" + assert result[0]["content"] == "What's the weather in Paris?" + + def test_transform_prompt_non_mastra_format(self): + """Test that non-Mastra formats return None.""" + # Standard GenAI format (already correct) + standard_input = json.dumps([{"role": "user", "content": "Hello"}]) + + result = self.transformer.transform_prompt(standard_input) + + assert result is None # No transformation needed + + def test_transform_prompt_invalid_json(self): + """Test handling of invalid JSON.""" + result = self.transformer.transform_prompt("not valid json") + + assert result is None + + def test_transform_prompt_non_string(self): + """Test handling of non-string input.""" + result = self.transformer.transform_prompt(["already", "a", "list"]) + + assert result is None + + def test_transform_completion_text_format(self): + """Test transforming Mastra's output format.""" + mastra_output = json.dumps( + {"files": [], "text": "The weather in Paris is sunny.", "warnings": [], "reasoning": [], "sources": []} + ) + + result = self.transformer.transform_completion(mastra_output) + + assert result is not None + assert len(result) == 1 + assert result[0] == {"role": "assistant", "content": "The weather in Paris is sunny."} + + def test_transform_completion_non_mastra_format(self): + """Test that non-Mastra output formats return None.""" + standard_output = json.dumps([{"role": "assistant", "content": "Hello"}]) + + result = self.transformer.transform_completion(standard_output) + + assert result is None # No transformation needed + + def test_transform_completion_invalid_json(self): + """Test handling of invalid JSON in completion.""" + result = self.transformer.transform_completion("not valid json") + + assert result is None + + def test_transform_completion_non_string(self): + """Test handling of non-string completion.""" + result = self.transformer.transform_completion({"already": "dict"}) + + assert result is None + + def test_end_to_end_conversation(self): + """Test a full conversation flow with Mastra format.""" + # Simulate what Mastra sends + prompt = json.dumps( + { + "messages": [ + {"role": "system", "content": "You are helpful"}, + {"role": "user", "content": [{"type": "text", "text": "Hi there!"}]}, + ] + } + ) + + completion = json.dumps({"text": "Hello! How can I help you?", "files": [], "warnings": []}) + + # Transform + prompt_result = self.transformer.transform_prompt(prompt) + completion_result = self.transformer.transform_completion(completion) + + # Verify + assert prompt_result == [ + {"role": "system", "content": "You are helpful"}, + {"role": "user", "content": "Hi there!"}, + ] + assert completion_result == [{"role": "assistant", "content": "Hello! How can I help you?"}] + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/products/llm_analytics/backend/api/otel/event_merger.py b/products/llm_analytics/backend/api/otel/event_merger.py new file mode 100644 index 000000000000..b83b74a5e0c6 --- /dev/null +++ b/products/llm_analytics/backend/api/otel/event_merger.py @@ -0,0 +1,118 @@ +""" +Event merger for combining OTEL log and trace events using Redis. + +V2 instrumentation sends traces (with metadata) and logs (with message content) +as separate HTTP requests. The order varies - sometimes traces first, sometimes logs first. + +This module uses Redis to cache and merge them bidirectionally with NO BLOCKING: +- First arrival (trace OR log): Cache in Redis, don't send +- Second arrival (log OR trace): Find cached partner, merge, and send + +This eliminates the thread starvation problem of blocking with time.sleep(). +""" + +import json +import logging +from typing import Any, Optional + +from posthog.redis import get_client + +logger = logging.getLogger(__name__) + +_CACHE_TTL = 60 # seconds - Redis will auto-expire after this + + +def cache_and_merge_properties( + trace_id: str, span_id: str, properties: dict[str, Any], is_trace: bool = True +) -> Optional[dict[str, Any]]: + """ + Cache properties and merge with any existing cached properties for the same span. + + Uses separate cache keys for traces and logs to properly accumulate multiple log events: + - Trace cache: otel_merge:trace:{trace_id}:{span_id} + - Logs cache: otel_merge:logs:{trace_id}:{span_id} + + Flow: + 1. Logs accumulate in logs cache (multiple logs merge together) + 2. When trace arrives, merge all accumulated logs with trace and send + 3. When log arrives after trace, merge immediately and send + + Args: + trace_id: Trace ID + span_id: Span ID + properties: Properties dict to cache/merge + is_trace: True if trace properties, False if log properties + + Returns: + - None if waiting for more data (cached, not sent) + - Merged properties if ready to send (send to capture) + """ + redis_client = get_client() + trace_cache_key = f"otel_merge:trace:{trace_id}:{span_id}" + logs_cache_key = f"otel_merge:logs:{trace_id}:{span_id}" + + try: + if is_trace: + # Trace arriving - check if logs are already cached + logs_json = redis_client.get(logs_cache_key) + + if logs_json: + # Logs already cached - merge and send + logs_properties = json.loads(logs_json) + merged = {**logs_properties, **properties} # Trace props override + redis_client.delete(logs_cache_key) + return merged + else: + # No logs yet - cache trace + redis_client.setex(trace_cache_key, _CACHE_TTL, json.dumps(properties)) + return None + else: + # Log arriving - accumulate with other logs first + logs_json = redis_client.get(logs_cache_key) + + if logs_json: + # Another log already cached - accumulate + existing_logs = json.loads(logs_json) + merged_logs = {**existing_logs, **properties} # Later log props override + redis_client.setex(logs_cache_key, _CACHE_TTL, json.dumps(merged_logs)) + + # Check if trace is ready + trace_json = redis_client.get(trace_cache_key) + if trace_json: + # Trace is ready - merge and send + trace_properties = json.loads(trace_json) + final_merged = {**merged_logs, **trace_properties} # Trace props override + redis_client.delete(logs_cache_key) + redis_client.delete(trace_cache_key) + return final_merged + + # Trace not ready yet - wait for it + return None + else: + # First log - check if trace is already cached + trace_json = redis_client.get(trace_cache_key) + + if trace_json: + # Trace already cached - merge and send + trace_properties = json.loads(trace_json) + merged = {**properties, **trace_properties} # Trace props override + redis_client.delete(trace_cache_key) + return merged + else: + # No trace yet - cache this log + redis_client.setex(logs_cache_key, _CACHE_TTL, json.dumps(properties)) + return None + + except Exception as e: + # Redis error - log and fall back to sending immediately + logger.exception( + f"event_merger_error: Redis error during merge, sending immediately", + extra={ + "trace_id": trace_id, + "span_id": span_id, + "error": str(e), + "is_trace": is_trace, + }, + ) + # Fallback: send immediately without merging + return properties diff --git a/products/llm_analytics/backend/api/otel/ingestion.py b/products/llm_analytics/backend/api/otel/ingestion.py new file mode 100644 index 000000000000..be648c0ad9c8 --- /dev/null +++ b/products/llm_analytics/backend/api/otel/ingestion.py @@ -0,0 +1,739 @@ +""" +OpenTelemetry traces and logs ingestion API endpoints. + +Accepts OTLP/HTTP (protobuf) format traces and logs and converts them to PostHog AI events. + +Supports both OpenAI instrumentation versions: +- v1 (opentelemetry-instrumentation-openai): Sends everything as trace span attributes +- v2 (opentelemetry-instrumentation-openai-v2): Sends metadata as spans, message content as logs + +Endpoints: +- POST /api/projects/:project_id/ai/otel/traces - Required for all instrumentation +- POST /api/projects/:project_id/ai/otel/logs - Required for v2 instrumentation with message content + +Content-Type: application/x-protobuf +Authorization: Bearer + +Authentication uses project API token (phc_...), NOT personal API key. +Token can be provided via Authorization header or ?token= query parameter. +""" + +import re +from typing import Any, Optional, Union + +from django.http import HttpRequest + +import structlog +from drf_spectacular.utils import extend_schema +from rest_framework import authentication, status +from rest_framework.decorators import api_view, authentication_classes, permission_classes +from rest_framework.exceptions import AuthenticationFailed, ValidationError +from rest_framework.request import Request +from rest_framework.response import Response + +from posthog.api.capture import capture_batch_internal +from posthog.models import Team + +from .logs_parser import parse_otlp_logs_request +from .logs_transformer import ( + build_event_properties, + calculate_timestamp, + determine_event_type, + extract_distinct_id, + transform_log_to_ai_event, +) +from .parser import parse_baggage_header, parse_otlp_request +from .transformer import transform_span_to_ai_event + +logger = structlog.get_logger(__name__) + +# OpenTelemetry limits (aligned with plugin-server validation) +OTEL_LIMITS = { + "MAX_SPANS_PER_REQUEST": 1000, + "MAX_ATTRIBUTES_PER_SPAN": 128, + "MAX_EVENTS_PER_SPAN": 128, + "MAX_LINKS_PER_SPAN": 128, + "MAX_ATTRIBUTE_VALUE_LENGTH": 100_000, # 100KB + "MAX_SPAN_NAME_LENGTH": 1024, + "MAX_LOGS_PER_REQUEST": 1000, + "MAX_ATTRIBUTES_PER_LOG": 128, + "MAX_LOG_BODY_LENGTH": 100_000, # 100KB +} + + +class ProjectTokenAuthentication(authentication.BaseAuthentication): + """ + Authenticates using a project API token (phc_...). + + This is used for ingestion endpoints where a public project token is used + instead of a personal API key. Supports token in: + 1. Authorization header: Bearer + 2. Query parameter: ?token= + + Similar to logs ingestion pattern. + """ + + keyword = "Bearer" + + @classmethod + def find_token( + cls, + request: Union[HttpRequest, Request], + ) -> Optional[str]: + """Try to find project token in request and return it.""" + # Try Authorization header first + if "HTTP_AUTHORIZATION" in request.META: + authorization_match = re.match(rf"^{cls.keyword}\s+(\S.+)$", request.META["HTTP_AUTHORIZATION"]) + if authorization_match: + token = authorization_match.group(1).strip() + # Only accept project tokens (phc_...), not personal keys + if token.startswith("phc_"): + return token + return None + + # Try query parameter + if "token" in request.GET: + token = request.GET["token"] + if token.startswith("phc_"): + return token + + return None + + def authenticate(self, request: Union[HttpRequest, Request]) -> Optional[tuple[Any, Team]]: + token = self.find_token(request) + + if not token: + return None + + # Get the team from the project token + team = Team.objects.get_team_from_cache_or_token(token) + + if team is None: + raise AuthenticationFailed(detail="Invalid project token.") + + # Return team as the "user" for this authentication + # The team itself acts as the authenticated entity + return (team, token) + + @classmethod + def authenticate_header(cls, request) -> str: + return cls.keyword + + +@extend_schema( + description=""" + OpenTelemetry traces ingestion endpoint for LLM Analytics. + + Accepts OTLP/HTTP (protobuf) format traces following the OpenTelemetry Protocol specification. + Converts OTel spans to PostHog AI events using PostHog-native and GenAI semantic conventions. + + Supported conventions: + - PostHog native: posthog.ai.* attributes (highest priority) + - GenAI semantic conventions: gen_ai.* attributes (fallback) + + Example OTel SDK configuration: + ```python + from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter + + exporter = OTLPSpanExporter( + endpoint="https://app.posthog.com/api/projects/{project_id}/ai/otel/traces", + headers={"Authorization": "Bearer phc_your_project_token"} + ) + ``` + + Authentication: + - Use your project API token (starts with phc_...), NOT a personal API key + - Token can be provided via Authorization header (Bearer token) or ?token= query parameter + + Rate limits and quotas apply as per normal PostHog event ingestion. + """, + request={"application/x-protobuf": bytes}, + responses={ + 200: {"description": "Traces accepted for processing"}, + 400: {"description": "Invalid OTLP format or validation errors"}, + 401: {"description": "Authentication failed"}, + 413: {"description": "Request too large (exceeds span/attribute limits)"}, + }, +) +@api_view(["POST"]) +@authentication_classes([ProjectTokenAuthentication]) +@permission_classes([]) +def otel_traces_endpoint(request: HttpRequest, project_id: int) -> Response: + """ + Process OTLP trace export requests. + + This endpoint: + 1. Validates authentication and project access + 2. Parses OTLP protobuf payload + 3. Validates against size/span limits + 4. Transforms OTel spans to PostHog AI events + 5. Routes events to capture pipeline for ingestion + """ + + # Get authenticated team from request + # ProjectTokenAuthentication returns (team, token) tuple + if not hasattr(request, "user") or not isinstance(request.user, Team): + return Response( + { + "error": "Invalid authentication. Use project token (phc_...) in Authorization header or ?token= parameter." + }, + status=status.HTTP_401_UNAUTHORIZED, + ) + + team = request.user + + # Verify the team ID matches the project_id in URL + if team.id != project_id: + return Response( + {"error": "Project ID in URL does not match authenticated project token"}, + status=status.HTTP_403_FORBIDDEN, + ) + + # Check content type + content_type = request.content_type or "" + if "protobuf" not in content_type and "octet-stream" not in content_type: + return Response( + { + "error": f"Invalid content type: {content_type}. Expected application/x-protobuf or application/octet-stream" + }, + status=status.HTTP_400_BAD_REQUEST, + ) + + # Get raw protobuf body + protobuf_data = request.body + + if not protobuf_data: + return Response( + {"error": "Empty request body"}, + status=status.HTTP_400_BAD_REQUEST, + ) + + try: + # Parse baggage from headers (for session context) + baggage_header = request.headers.get("baggage") + baggage = parse_baggage_header(baggage_header) if baggage_header else {} + + # Parse OTLP protobuf + parsed_request = parse_otlp_trace_request(protobuf_data) + + # Validate request + validation_errors = validate_otlp_request(parsed_request) + if validation_errors: + logger.warning( + "otel_traces_validation_failed", + team_id=team.id, + errors=validation_errors, + ) + return Response( + {"error": "Validation failed", "details": validation_errors}, + status=status.HTTP_400_BAD_REQUEST, + ) + + # Transform spans to AI events + events = transform_spans_to_ai_events(parsed_request, baggage) + + # Route to capture pipeline + capture_events(events, team) + + return Response( + { + "status": "success", + "message": "Traces ingested successfully", + "spans_received": len(parsed_request), + "events_created": len(events), + }, + status=status.HTTP_200_OK, + ) + + except ValidationError as e: + logger.warning( + "otel_traces_validation_error", + team_id=team.id, + error=str(e), + ) + return Response( + {"error": str(e)}, + status=status.HTTP_400_BAD_REQUEST, + ) + + except Exception as e: + logger.error( + "otel_traces_processing_error", + team_id=team.id, + error=str(e), + exc_info=True, + ) + return Response( + {"error": "Internal server error processing traces"}, + status=status.HTTP_500_INTERNAL_SERVER_ERROR, + ) + + +def parse_otlp_trace_request(protobuf_data: bytes) -> list[dict[str, Any]]: + """ + Parse OTLP ExportTraceServiceRequest from protobuf bytes. + + Returns list of dicts, each containing: + - span: parsed span dict + - resource: dict of resource attributes for this span + - scope: dict of instrumentation scope info for this span + """ + return parse_otlp_request(protobuf_data) + + +def validate_otlp_request(parsed_items: list[dict[str, Any]]) -> list[dict[str, Any]]: + """ + Validate OTLP request against limits. + + Returns list of validation errors (empty if valid). + """ + errors = [] + + # Check span count + if len(parsed_items) > OTEL_LIMITS["MAX_SPANS_PER_REQUEST"]: + errors.append( + { + "field": "request.spans", + "value": len(parsed_items), + "limit": OTEL_LIMITS["MAX_SPANS_PER_REQUEST"], + "message": f"Request contains {len(parsed_items)} spans, maximum is {OTEL_LIMITS['MAX_SPANS_PER_REQUEST']}. Configure batch size in your OTel SDK (e.g., OTEL_BSP_MAX_EXPORT_BATCH_SIZE).", + } + ) + + # Validate each span + for i, item in enumerate(parsed_items): + span = item.get("span", {}) + + # Check span name length + span_name = span.get("name", "") + if len(span_name) > OTEL_LIMITS["MAX_SPAN_NAME_LENGTH"]: + errors.append( + { + "field": f"span[{i}].name", + "value": len(span_name), + "limit": OTEL_LIMITS["MAX_SPAN_NAME_LENGTH"], + "message": f"Span name exceeds {OTEL_LIMITS['MAX_SPAN_NAME_LENGTH']} characters.", + } + ) + + # Check attribute count + attributes = span.get("attributes", {}) + if len(attributes) > OTEL_LIMITS["MAX_ATTRIBUTES_PER_SPAN"]: + errors.append( + { + "field": f"span[{i}].attributes", + "value": len(attributes), + "limit": OTEL_LIMITS["MAX_ATTRIBUTES_PER_SPAN"], + "message": f"Span has {len(attributes)} attributes, maximum is {OTEL_LIMITS['MAX_ATTRIBUTES_PER_SPAN']}.", + } + ) + + # Check attribute value sizes + for key, value in attributes.items(): + if isinstance(value, str) and len(value) > OTEL_LIMITS["MAX_ATTRIBUTE_VALUE_LENGTH"]: + errors.append( + { + "field": f"span[{i}].attributes.{key}", + "value": len(value), + "limit": OTEL_LIMITS["MAX_ATTRIBUTE_VALUE_LENGTH"], + "message": f"Attribute '{key}' exceeds {OTEL_LIMITS['MAX_ATTRIBUTE_VALUE_LENGTH']} bytes ({len(value)} bytes). Consider reducing payload size.", + } + ) + + # Check event count + events = span.get("events", []) + if len(events) > OTEL_LIMITS["MAX_EVENTS_PER_SPAN"]: + errors.append( + { + "field": f"span[{i}].events", + "value": len(events), + "limit": OTEL_LIMITS["MAX_EVENTS_PER_SPAN"], + "message": f"Span has {len(events)} events, maximum is {OTEL_LIMITS['MAX_EVENTS_PER_SPAN']}.", + } + ) + + # Check link count + links = span.get("links", []) + if len(links) > OTEL_LIMITS["MAX_LINKS_PER_SPAN"]: + errors.append( + { + "field": f"span[{i}].links", + "value": len(links), + "limit": OTEL_LIMITS["MAX_LINKS_PER_SPAN"], + "message": f"Span has {len(links)} links, maximum is {OTEL_LIMITS['MAX_LINKS_PER_SPAN']}.", + } + ) + + return errors + + +def transform_spans_to_ai_events(parsed_items: list[dict[str, Any]], baggage: dict[str, str]) -> list[dict[str, Any]]: + """ + Transform OTel spans to PostHog AI events. + + Uses waterfall pattern for attribute extraction: + 1. PostHog native (posthog.ai.*) + 2. GenAI semantic conventions (gen_ai.*) + + Each item contains its own span, resource, and scope context to correctly + handle requests with multiple resource_spans/scope_spans. + + Note: Returns only events ready to send. Events that are first arrivals + (cached, waiting for logs) are filtered out. + """ + events = [] + for item in parsed_items: + span = item.get("span", {}) + resource = item.get("resource", {}) + scope = item.get("scope", {}) + + event = transform_span_to_ai_event(span, resource, scope, baggage) + if event is not None: # Filter out first arrivals (cached, waiting for logs) + events.append(event) + + return events + + +def capture_events(events: list[dict[str, Any]], team: Team) -> None: + """ + Route transformed events to PostHog capture pipeline. + + Uses capture_batch_internal to submit events to capture-rs. + Events are submitted concurrently for better performance. + """ + if not events: + return + + # Submit events to capture pipeline + futures = capture_batch_internal( + events=events, + event_source="otel_traces_ingestion", + token=team.api_token, + process_person_profile=False, # AI events don't need person processing + ) + + # Wait for all futures to complete and check for errors + errors = [] + for i, future in enumerate(futures): + try: + response = future.result() + if response.status_code not in (200, 201): + errors.append(f"Event {i}: HTTP {response.status_code}") + except Exception as e: + errors.append(f"Event {i}: {str(e)}") + + if errors: + logger.warning( + "otel_traces_capture_errors", + team_id=team.id, + error_count=len(errors), + errors=errors[:10], # Log first 10 errors + ) + + +@extend_schema( + description=""" + OpenTelemetry logs ingestion endpoint for LLM Analytics. + + Accepts OTLP/HTTP (protobuf) format logs following the OpenTelemetry Protocol specification. + Converts OTel log records to PostHog AI events. Logs from GenAI instrumentation typically + contain message content (prompts/completions) in the body field. + + Supported conventions: + - GenAI semantic conventions: gen_ai.* attributes + - Generic OTel log attributes + + Example OTel SDK configuration: + ```python + from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter + + exporter = OTLPLogExporter( + endpoint="https://app.posthog.com/api/projects/{project_id}/ai/otel/logs", + headers={"Authorization": "Bearer phc_your_project_token"} + ) + ``` + + Authentication: + - Use your project API token (starts with phc_...), NOT a personal API key + - Token can be provided via Authorization header (Bearer token) or ?token= query parameter + + Rate limits and quotas apply as per normal PostHog event ingestion. + """, + request={"application/x-protobuf": bytes}, + responses={ + 200: {"description": "Logs accepted for processing"}, + 400: {"description": "Invalid OTLP format or validation errors"}, + 401: {"description": "Authentication failed"}, + 413: {"description": "Request too large (exceeds log/attribute limits)"}, + }, +) +@api_view(["POST"]) +@authentication_classes([ProjectTokenAuthentication]) +@permission_classes([]) +def otel_logs_endpoint(request: HttpRequest, project_id: int) -> Response: + """ + Process OTLP logs export requests. + + This endpoint: + 1. Validates authentication and project access + 2. Parses OTLP protobuf payload + 3. Validates against size/log limits + 4. Transforms OTel log records to PostHog AI events + 5. Routes events to capture pipeline for ingestion + """ + + # Get authenticated team from request + # ProjectTokenAuthentication returns (team, token) tuple + if not hasattr(request, "user") or not isinstance(request.user, Team): + return Response( + { + "error": "Invalid authentication. Use project token (phc_...) in Authorization header or ?token= parameter." + }, + status=status.HTTP_401_UNAUTHORIZED, + ) + + team = request.user + + # Verify the team ID matches the project_id in URL + if team.id != project_id: + return Response( + {"error": "Project ID in URL does not match authenticated project token"}, + status=status.HTTP_403_FORBIDDEN, + ) + + # Check content type + content_type = request.content_type or "" + if "protobuf" not in content_type and "octet-stream" not in content_type: + return Response( + { + "error": f"Invalid content type: {content_type}. Expected application/x-protobuf or application/octet-stream" + }, + status=status.HTTP_400_BAD_REQUEST, + ) + + # Get raw protobuf body + protobuf_data = request.body + + if not protobuf_data: + return Response( + {"error": "Empty request body"}, + status=status.HTTP_400_BAD_REQUEST, + ) + + try: + # Parse OTLP protobuf + parsed_request = parse_otlp_logs_request(protobuf_data) + + # Validate request + validation_errors = validate_otlp_logs_request(parsed_request) + if validation_errors: + logger.warning( + "otel_logs_validation_failed", + team_id=team.id, + errors=validation_errors, + ) + return Response( + {"error": "Validation failed", "details": validation_errors}, + status=status.HTTP_400_BAD_REQUEST, + ) + + # Transform logs to AI events (also caches properties for merging with traces) + events = transform_logs_to_ai_events(parsed_request) + + # Route merged events to capture pipeline + capture_events(events, team) + + return Response( + { + "status": "success", + "message": "Logs ingested successfully", + "logs_received": len(parsed_request), + "events_created": len(events), + }, + status=status.HTTP_200_OK, + ) + + except ValidationError as e: + logger.warning( + "otel_logs_validation_error", + team_id=team.id, + error=str(e), + ) + return Response( + {"error": str(e)}, + status=status.HTTP_400_BAD_REQUEST, + ) + + except Exception as e: + logger.error( + "otel_logs_processing_error", + team_id=team.id, + error=str(e), + exc_info=True, + ) + return Response( + {"error": "Internal server error processing logs"}, + status=status.HTTP_500_INTERNAL_SERVER_ERROR, + ) + + +def validate_otlp_logs_request(parsed_items: list[dict[str, Any]]) -> list[dict[str, Any]]: + """ + Validate OTLP logs request against limits. + + Returns list of validation errors (empty if valid). + """ + errors = [] + + # Check log count + if len(parsed_items) > OTEL_LIMITS["MAX_LOGS_PER_REQUEST"]: + errors.append( + { + "field": "request.logs", + "value": len(parsed_items), + "limit": OTEL_LIMITS["MAX_LOGS_PER_REQUEST"], + "message": f"Request contains {len(parsed_items)} logs, maximum is {OTEL_LIMITS['MAX_LOGS_PER_REQUEST']}. Configure batch size in your OTel SDK.", + } + ) + + # Validate each log record + for i, item in enumerate(parsed_items): + log_record = item.get("log", {}) + + # Check attribute count + attributes = log_record.get("attributes", {}) + if len(attributes) > OTEL_LIMITS["MAX_ATTRIBUTES_PER_LOG"]: + errors.append( + { + "field": f"log[{i}].attributes", + "value": len(attributes), + "limit": OTEL_LIMITS["MAX_ATTRIBUTES_PER_LOG"], + "message": f"Log has {len(attributes)} attributes, maximum is {OTEL_LIMITS['MAX_ATTRIBUTES_PER_LOG']}.", + } + ) + + # Check body size + body = log_record.get("body") + if body and isinstance(body, str) and len(body) > OTEL_LIMITS["MAX_LOG_BODY_LENGTH"]: + errors.append( + { + "field": f"log[{i}].body", + "value": len(body), + "limit": OTEL_LIMITS["MAX_LOG_BODY_LENGTH"], + "message": f"Log body exceeds {OTEL_LIMITS['MAX_LOG_BODY_LENGTH']} bytes ({len(body)} bytes). Consider reducing payload size.", + } + ) + + # Check attribute value sizes + for key, value in attributes.items(): + if isinstance(value, str) and len(value) > OTEL_LIMITS["MAX_ATTRIBUTE_VALUE_LENGTH"]: + errors.append( + { + "field": f"log[{i}].attributes.{key}", + "value": len(value), + "limit": OTEL_LIMITS["MAX_ATTRIBUTE_VALUE_LENGTH"], + "message": f"Attribute '{key}' exceeds {OTEL_LIMITS['MAX_ATTRIBUTE_VALUE_LENGTH']} bytes ({len(value)} bytes).", + } + ) + + return errors + + +def transform_logs_to_ai_events(parsed_items: list[dict[str, Any]]) -> list[dict[str, Any]]: + """ + Transform OTel log records to PostHog AI events. + + CRITICAL: In v2 instrumentation, multiple log events (user message, assistant message, etc.) + arrive in the SAME HTTP request. We must accumulate ALL logs for the same span BEFORE + calling the event merger to avoid race conditions where the trace consumes partial logs. + + Each item contains its own log, resource, and scope context to correctly + handle requests with multiple resource_logs/scope_logs. + + Note: Returns only events ready to send. Events that are first arrivals + (cached, waiting for traces) are filtered out. + """ + # Group logs by (trace_id, span_id) to accumulate them before merging + # Store full items (with resource/scope) not just log records + from collections import defaultdict + + logs_by_span: dict[tuple[str | None, str | None], list[dict[str, Any]]] = defaultdict(list) + + for item in parsed_items: + log_record = item.get("log", {}) + trace_id = log_record.get("trace_id", "") + span_id = log_record.get("span_id", "") + + if trace_id and span_id: + logs_by_span[(trace_id, span_id)].append(item) + else: + # No trace/span ID - process individually + logs_by_span[(None, None)].append(item) + + events = [] + + # Process each span's logs together + for (trace_id, span_id), span_items in logs_by_span.items(): + if trace_id and span_id: + # Accumulate properties from all logs for this span + # Use the first item's resource/scope (logs from same span should have same context) + first_item = span_items[0] + resource = first_item.get("resource", {}) + scope = first_item.get("scope", {}) + + accumulated_props = {} + for item in span_items: + log_record = item.get("log", {}) + # Use each log's own resource/scope for property building + item_resource = item.get("resource", {}) + item_scope = item.get("scope", {}) + props = build_event_properties(log_record, log_record.get("attributes", {}), item_resource, item_scope) + # Merge properties with special handling for arrays + for key, value in props.items(): + if key in ("$ai_input", "$ai_output_choices"): + # Concatenate message arrays instead of overwriting + if key in accumulated_props and isinstance(accumulated_props[key], list): + accumulated_props[key] = accumulated_props[key] + value + else: + accumulated_props[key] = value + else: + # For non-array fields, later values override earlier ones + accumulated_props[key] = value + + # Now call event merger once with all accumulated properties + from .event_merger import cache_and_merge_properties + + merged = cache_and_merge_properties(trace_id, span_id, accumulated_props, is_trace=False) + + if merged is not None: + # Ready to send - create event + first_log = first_item.get("log", {}) + event_type = determine_event_type(first_log, first_log.get("attributes", {})) + timestamp = calculate_timestamp(first_log) + distinct_id = extract_distinct_id(resource, first_log.get("attributes", {})) + + # Generate consistent UUID from trace_id + span_id + import uuid + + namespace = uuid.UUID("00000000-0000-0000-0000-000000000000") + event_uuid = str(uuid.uuid5(namespace, f"{trace_id}:{span_id}")) + + event = { + "event": event_type, + "distinct_id": distinct_id, + "timestamp": timestamp, + "properties": merged, + "uuid": event_uuid, + } + events.append(event) + else: + # No trace/span ID - process logs individually (shouldn't happen in normal v2) + for item in span_items: + log_record = item.get("log", {}) + resource = item.get("resource", {}) + scope = item.get("scope", {}) + event = transform_log_to_ai_event(log_record, resource, scope) + if event is not None: + events.append(event) + + return events diff --git a/products/llm_analytics/backend/api/otel/logs_parser.py b/products/llm_analytics/backend/api/otel/logs_parser.py new file mode 100644 index 000000000000..d7aa7f29085d --- /dev/null +++ b/products/llm_analytics/backend/api/otel/logs_parser.py @@ -0,0 +1,81 @@ +""" +OTLP protobuf parser for OpenTelemetry logs. + +Parses ExportLogsServiceRequest protobuf messages and extracts log records, +resource attributes, and instrumentation scope information. +""" + +from typing import Any + +from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import ExportLogsServiceRequest +from opentelemetry.proto.logs.v1.logs_pb2 import LogRecord + +from .parser import parse_any_value, parse_attributes + + +def parse_otlp_logs_request(protobuf_data: bytes) -> list[dict[str, Any]]: + """ + Parse OTLP ExportLogsServiceRequest from protobuf bytes. + + Returns a list of dicts, each containing: + - log: parsed log record dict + - resource: dict of resource attributes for this log + - scope: dict of instrumentation scope info for this log + + Each log carries its own resource/scope context to handle requests + containing multiple resource_logs/scope_logs correctly. + """ + request = ExportLogsServiceRequest() + request.ParseFromString(protobuf_data) + + results = [] + + # OTLP structure: resource_logs -> scope_logs -> log_records + for resource_logs in request.resource_logs: + # Extract resource attributes (service.name, etc.) + resource_attrs = {} + if resource_logs.HasField("resource"): + resource_attrs = parse_attributes(resource_logs.resource.attributes) + + # Iterate through scope logs + for scope_logs in resource_logs.scope_logs: + # Extract instrumentation scope + scope_info = {} + if scope_logs.HasField("scope"): + scope_info = { + "name": scope_logs.scope.name, + "version": scope_logs.scope.version if scope_logs.scope.version else None, + "attributes": parse_attributes(scope_logs.scope.attributes) if scope_logs.scope.attributes else {}, + } + + # Parse each log record with its resource/scope context + for log_record in scope_logs.log_records: + parsed_log = parse_log_record(log_record) + results.append( + { + "log": parsed_log, + "resource": resource_attrs, + "scope": scope_info, + } + ) + + return results + + +def parse_log_record(log_record: LogRecord) -> dict[str, Any]: + """ + Parse a single OTLP log record into a dict. + """ + return { + "time_unix_nano": str(log_record.time_unix_nano), + "observed_time_unix_nano": str(log_record.observed_time_unix_nano) + if log_record.observed_time_unix_nano + else None, + "severity_number": log_record.severity_number, + "severity_text": log_record.severity_text if log_record.severity_text else None, + "body": parse_any_value(log_record.body) if log_record.HasField("body") else None, + "attributes": parse_attributes(log_record.attributes), + "trace_id": log_record.trace_id.hex() if log_record.trace_id else None, + "span_id": log_record.span_id.hex() if log_record.span_id else None, + "flags": log_record.flags if log_record.flags else None, + } diff --git a/products/llm_analytics/backend/api/otel/logs_transformer.py b/products/llm_analytics/backend/api/otel/logs_transformer.py new file mode 100644 index 000000000000..fd44bf542e3a --- /dev/null +++ b/products/llm_analytics/backend/api/otel/logs_transformer.py @@ -0,0 +1,313 @@ +""" +Core OTel log record to PostHog AI event transformer. + +Transforms OpenTelemetry log records into PostHog AI events. +Log records from GenAI instrumentation typically contain message content +(prompts/completions) in the body field. +""" + +import json +from datetime import UTC, datetime +from typing import Any + +from .event_merger import cache_and_merge_properties + +OTEL_TRANSFORMER_VERSION = "1.0.0" + + +def transform_log_to_ai_event( + log_record: dict[str, Any], + resource: dict[str, Any], + scope: dict[str, Any], +) -> dict[str, Any] | None: + """ + Transform a single OTel log record to PostHog AI event. + + Args: + log_record: Parsed OTel log record + resource: Resource attributes (service.name, etc.) + scope: Instrumentation scope info + + Returns: + PostHog AI event dict OR None if this is first arrival (cached, waiting for trace): + - event: Event type ($ai_generation, $ai_span, etc.) + - distinct_id: User identifier + - timestamp: ISO 8601 timestamp + - properties: AI event properties + - uuid: Event UUID (for deduplication with trace events) + """ + attributes = log_record.get("attributes", {}) + + # Build AI event properties + properties = build_event_properties(log_record, attributes, resource, scope) + + # True bidirectional merge with Redis (no blocking) + # First arrival caches, second arrival merges and sends + trace_id = log_record.get("trace_id", "") + span_id = log_record.get("span_id", "") + if trace_id and span_id: + merged = cache_and_merge_properties(trace_id, span_id, properties, is_trace=False) + if merged is None: + # This is first arrival - log cached, waiting for trace + # Don't send this event yet + return None + # Second arrival - trace already cached, merged contains complete event + properties = merged + + # Determine event type + event_type = determine_event_type(log_record, attributes) + + # Calculate timestamp + timestamp = calculate_timestamp(log_record) + + # Get distinct_id + distinct_id = extract_distinct_id(resource, attributes) + + # Generate consistent UUID from trace_id + span_id for deduplication + # This allows log events and trace events for the same span to merge + import uuid + + event_uuid = None + if trace_id and span_id: + # Create deterministic UUID from trace_id + span_id + namespace = uuid.UUID("00000000-0000-0000-0000-000000000000") + event_uuid = str(uuid.uuid5(namespace, f"{trace_id}:{span_id}")) + + result = { + "event": event_type, + "distinct_id": distinct_id, + "timestamp": timestamp, + "properties": properties, + } + + if event_uuid: + result["uuid"] = event_uuid + + return result + + +def build_event_properties( + log_record: dict[str, Any], + attributes: dict[str, Any], + resource: dict[str, Any], + scope: dict[str, Any], +) -> dict[str, Any]: + """Build PostHog AI event properties from log record.""" + + # Core identifiers (from log record) + trace_id = log_record.get("trace_id") + span_id = log_record.get("span_id") + + # Session ID (from attributes or resource) + session_id = attributes.get("session_id") or resource.get("session.id") + + # Extract message content from body + body = log_record.get("body") + + # Build base properties + properties: dict[str, Any] = {} + + # Core IDs + if trace_id: + properties["$ai_trace_id"] = trace_id + if span_id: + properties["$ai_span_id"] = span_id + if session_id: + properties["$ai_session_id"] = session_id + + # Model info (from attributes) + if attributes.get("gen_ai.system"): + properties["$ai_provider"] = attributes["gen_ai.system"] + elif attributes.get("model.provider"): + properties["$ai_provider"] = attributes["model.provider"] + + if attributes.get("gen_ai.request.model"): + properties["$ai_model"] = attributes["gen_ai.request.model"] + elif attributes.get("gen_ai.response.model"): + properties["$ai_model"] = attributes["gen_ai.response.model"] + elif attributes.get("model.name"): + properties["$ai_model"] = attributes["model.name"] + + # Tokens (from attributes) + if attributes.get("gen_ai.usage.input_tokens") is not None: + properties["$ai_input_tokens"] = attributes["gen_ai.usage.input_tokens"] + if attributes.get("gen_ai.usage.output_tokens") is not None: + properties["$ai_output_tokens"] = attributes["gen_ai.usage.output_tokens"] + + # Handle v2 structured log events (message content from body) + # v2 instrumentation sends logs with event names like: + # - gen_ai.user.message (body: {"content": "..."}) + # - gen_ai.system.message (body: {"content": "..."}) + # - gen_ai.assistant.message (body: {"content": "..."} or {"tool_calls": [...]}) + # - gen_ai.choice (body: {"index": 0, "finish_reason": "stop", "message": {...}}) + event_name = attributes.get("event.name", "").lower() + + if isinstance(body, dict): + # User/system messages: {"content": "..."} + if "gen_ai.user.message" in event_name or "gen_ai.system.message" in event_name: + if "content" in body: + role = "system" if "system" in event_name else "user" + properties["$ai_input"] = [{"role": role, "content": body["content"]}] + + # Assistant messages: {"content": "..."} or {"tool_calls": [...]} + # These are previous messages in conversation history, so they go into $ai_input + elif "gen_ai.assistant.message" in event_name: + message = {"role": "assistant"} + if "content" in body: + message["content"] = body["content"] + if "tool_calls" in body: + message["tool_calls"] = body["tool_calls"] + properties["$ai_input"] = [message] + + # Tool messages: {"content": "...", "id": "tool_call_id"} + # These are tool execution results in conversation history, so they go into $ai_input + elif "gen_ai.tool.message" in event_name: + message = {"role": "tool"} + if "content" in body: + message["content"] = body["content"] + if "id" in body: + message["tool_call_id"] = body["id"] + properties["$ai_input"] = [message] + + # Choice events: {"index": 0, "finish_reason": "stop", "message": {...}} + # This is the CURRENT response, so it goes into $ai_output_choices + elif "gen_ai.choice" in event_name and "message" in body: + message_obj = body["message"] + choice = {"role": message_obj.get("role", "assistant")} + if "content" in message_obj: + choice["content"] = message_obj["content"] + if "tool_calls" in message_obj: + choice["tool_calls"] = message_obj["tool_calls"] + if "finish_reason" in body: + choice["finish_reason"] = body["finish_reason"] + properties["$ai_output_choices"] = [choice] + + # Fallback: Handle gen_ai.prompt/completion from span attributes (v1-style) + if "$ai_input" not in properties and attributes.get("gen_ai.prompt"): + prompt = attributes["gen_ai.prompt"] + if isinstance(prompt, str): + try: + parsed = json.loads(prompt) + properties["$ai_input"] = parsed if isinstance(parsed, list) else [{"role": "user", "content": prompt}] + except (json.JSONDecodeError, TypeError): + properties["$ai_input"] = [{"role": "user", "content": prompt}] + elif isinstance(prompt, list): + properties["$ai_input"] = prompt + elif isinstance(prompt, dict): + properties["$ai_input"] = [prompt] + + if "$ai_output_choices" not in properties and attributes.get("gen_ai.completion"): + completion = attributes["gen_ai.completion"] + if isinstance(completion, str): + try: + parsed = json.loads(completion) + properties["$ai_output_choices"] = ( + parsed if isinstance(parsed, list) else [{"role": "assistant", "content": completion}] + ) + except (json.JSONDecodeError, TypeError): + properties["$ai_output_choices"] = [{"role": "assistant", "content": completion}] + elif isinstance(completion, list): + properties["$ai_output_choices"] = completion + elif isinstance(completion, dict): + properties["$ai_output_choices"] = [completion] + + # Severity + if log_record.get("severity_number"): + properties["$ai_log_severity_number"] = log_record["severity_number"] + if log_record.get("severity_text"): + properties["$ai_log_severity_text"] = log_record["severity_text"] + + # Metadata + properties["$ai_otel_transformer_version"] = OTEL_TRANSFORMER_VERSION + properties["$ai_otel_log_source"] = "logs" + + # Resource attributes (service name, etc.) + if resource.get("service.name"): + properties["$ai_service_name"] = resource["service.name"] + + # Instrumentation scope + properties["$ai_instrumentation_scope_name"] = scope.get("name", "unknown") + if scope.get("version"): + properties["$ai_instrumentation_scope_version"] = scope["version"] + + # Add remaining log attributes (not already mapped) + mapped_keys = { + "gen_ai.system", + "gen_ai.request.model", + "gen_ai.response.model", + "gen_ai.usage.input_tokens", + "gen_ai.usage.output_tokens", + "gen_ai.prompt", + "gen_ai.completion", + "model.provider", + "model.name", + "message.content", + "session_id", + "session.id", + "event.name", + "service.name", + } + + for key, value in attributes.items(): + if key not in mapped_keys and not key.startswith("gen_ai."): + # Add unmapped attributes with prefix + properties[f"otel.{key}"] = value + + return properties + + +def determine_event_type(log_record: dict[str, Any], attributes: dict[str, Any]) -> str: + """Determine AI event type from log record.""" + event_name = attributes.get("event.name", "").lower() + + # v2 instrumentation events (gen_ai.user.message, gen_ai.assistant.message, gen_ai.choice) + # These should be $ai_generation events to merge with trace span data + if "gen_ai." in event_name and ("message" in event_name or "choice" in event_name): + return "$ai_generation" + + # Check event name for GenAI events + if "prompt" in event_name or "input" in event_name: + return "$ai_generation" + elif "completion" in event_name or "output" in event_name or "response" in event_name: + return "$ai_generation" + elif "embedding" in event_name: + return "$ai_embedding" + + # Check operation name + op_name = attributes.get("gen_ai.operation.name", "").lower() + if op_name in ("chat", "completion"): + return "$ai_generation" + elif op_name in ("embedding", "embeddings"): + return "$ai_embedding" + + # Default to generic span + return "$ai_span" + + +def calculate_timestamp(log_record: dict[str, Any]) -> str: + """Calculate timestamp from log record time.""" + time_nanos = int(log_record.get("time_unix_nano", 0)) + if time_nanos == 0: + # Fallback to observed time if time is not set + time_nanos = int(log_record.get("observed_time_unix_nano", 0)) + + millis = time_nanos // 1_000_000 + return datetime.fromtimestamp(millis / 1000, tz=UTC).isoformat() + + +def extract_distinct_id(resource: dict[str, Any], attributes: dict[str, Any]) -> str: + """Extract distinct_id from resource or attributes.""" + # Try resource attributes + user_id = resource.get("user.id") or resource.get("enduser.id") or resource.get("posthog.distinct_id") + + if user_id and isinstance(user_id, str): + return user_id + + # Try log attributes + if attributes.get("user_id"): + return str(attributes["user_id"]) + if attributes.get("distinct_id"): + return str(attributes["distinct_id"]) + + # Default to anonymous + return "anonymous" diff --git a/products/llm_analytics/backend/api/otel/parser.py b/products/llm_analytics/backend/api/otel/parser.py new file mode 100644 index 000000000000..817751b3b620 --- /dev/null +++ b/products/llm_analytics/backend/api/otel/parser.py @@ -0,0 +1,190 @@ +""" +OTLP protobuf parser for OpenTelemetry traces. + +Parses ExportTraceServiceRequest protobuf messages and extracts spans, +resource attributes, and instrumentation scope information. +""" + +from typing import Any + +from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ExportTraceServiceRequest +from opentelemetry.proto.common.v1.common_pb2 import AnyValue, KeyValue +from opentelemetry.proto.trace.v1.trace_pb2 import Span + + +def parse_otlp_request(protobuf_data: bytes) -> list[dict[str, Any]]: + """ + Parse OTLP ExportTraceServiceRequest from protobuf bytes. + + Returns a list of dicts, each containing: + - span: parsed span dict + - resource: dict of resource attributes for this span + - scope: dict of instrumentation scope info for this span + + Each span carries its own resource/scope context to handle requests + containing multiple resource_spans/scope_spans correctly. + """ + request = ExportTraceServiceRequest() + request.ParseFromString(protobuf_data) + + results = [] + + # OTLP structure: resource_spans -> scope_spans -> spans + for resource_spans in request.resource_spans: + # Extract resource attributes (service.name, etc.) + resource_attrs = {} + if resource_spans.HasField("resource"): + resource_attrs = parse_attributes(resource_spans.resource.attributes) + + # Iterate through scope spans + for scope_spans in resource_spans.scope_spans: + # Extract instrumentation scope + scope_info = {} + if scope_spans.HasField("scope"): + scope_info = { + "name": scope_spans.scope.name, + "version": scope_spans.scope.version if scope_spans.scope.version else None, + "attributes": parse_attributes(scope_spans.scope.attributes) + if scope_spans.scope.attributes + else {}, + } + + # Parse each span with its resource/scope context + for span in scope_spans.spans: + parsed_span = parse_span(span) + results.append( + { + "span": parsed_span, + "resource": resource_attrs, + "scope": scope_info, + } + ) + + return results + + +def parse_span(span: Span) -> dict[str, Any]: + """ + Parse a single OTLP span into a dict. + """ + return { + "trace_id": span.trace_id.hex(), + "span_id": span.span_id.hex(), + "parent_span_id": span.parent_span_id.hex() if span.parent_span_id else None, + "name": span.name, + "kind": span.kind, + "start_time_unix_nano": str(span.start_time_unix_nano), + "end_time_unix_nano": str(span.end_time_unix_nano), + "attributes": parse_attributes(span.attributes), + "events": [parse_span_event(event) for event in span.events], + "links": [parse_span_link(link) for link in span.links], + "status": parse_span_status(span.status), + } + + +def parse_span_event(event) -> dict[str, Any]: + """ + Parse a span event. + """ + return { + "time_unix_nano": str(event.time_unix_nano), + "name": event.name, + "attributes": parse_attributes(event.attributes), + } + + +def parse_span_link(link) -> dict[str, Any]: + """ + Parse a span link. + """ + return { + "trace_id": link.trace_id.hex(), + "span_id": link.span_id.hex(), + "attributes": parse_attributes(link.attributes), + } + + +def parse_span_status(status) -> dict[str, Any]: + """ + Parse span status. + """ + return { + "code": status.code, + "message": status.message if status.message else None, + } + + +def parse_attributes(attributes: list[KeyValue]) -> dict[str, Any]: + """ + Parse OpenTelemetry attributes (key-value pairs) into a dict. + + Handles different value types: string, int, double, bool, array, kvlist. + """ + result = {} + + for kv in attributes: + key = kv.key + value = parse_any_value(kv.value) + result[key] = value + + return result + + +def parse_any_value(value: AnyValue) -> Any: + """ + Parse an AnyValue protobuf type into a Python value. + + AnyValue can be: + - string_value + - int_value + - double_value + - bool_value + - array_value (list of AnyValue) + - kvlist_value (dict of key-value pairs) + - bytes_value + """ + # Check which field is set + which = value.WhichOneof("value") + + if which == "string_value": + return value.string_value + elif which == "int_value": + return value.int_value + elif which == "double_value": + return value.double_value + elif which == "bool_value": + return value.bool_value + elif which == "array_value": + return [parse_any_value(item) for item in value.array_value.values] + elif which == "kvlist_value": + return parse_attributes(value.kvlist_value.values) + elif which == "bytes_value": + return value.bytes_value.hex() + else: + # Unknown or unset + return None + + +def parse_baggage_header(baggage_header: str | None) -> dict[str, str]: + """ + Parse OTel baggage from HTTP header. + + Baggage format: key1=value1,key2=value2,... + + Example: session_id=abc123,user_id=user_456 + """ + if not baggage_header: + return {} + + baggage = {} + + # Split by comma + items = baggage_header.split(",") + + for item in items: + item = item.strip() + if "=" in item: + key, value = item.split("=", 1) + baggage[key.strip()] = value.strip() + + return baggage diff --git a/products/llm_analytics/backend/api/otel/test_event_merger.py b/products/llm_analytics/backend/api/otel/test_event_merger.py new file mode 100644 index 000000000000..8120af795c72 --- /dev/null +++ b/products/llm_analytics/backend/api/otel/test_event_merger.py @@ -0,0 +1,168 @@ +""" +Unit tests for Redis-backed event_merger bidirectional merging logic. +""" + +import json + +import pytest +from unittest.mock import MagicMock, patch + +from products.llm_analytics.backend.api.otel.event_merger import cache_and_merge_properties + + +@pytest.fixture +def mock_redis(): + """Mock Redis client for testing.""" + redis_mock = MagicMock() + redis_mock.cache = {} # Internal dict to simulate Redis storage + + def mock_get(key): + return redis_mock.cache.get(key) + + def mock_setex(key, ttl, value): + redis_mock.cache[key] = value + return True + + def mock_delete(*keys): + for key in keys: + redis_mock.cache.pop(key, None) + return len(keys) + + redis_mock.get.side_effect = mock_get + redis_mock.setex.side_effect = mock_setex + redis_mock.delete.side_effect = mock_delete + + with patch("products.llm_analytics.backend.api.otel.event_merger.get_client", return_value=redis_mock): + yield redis_mock + + +def test_logs_first_then_trace(mock_redis): + """Test scenario: logs arrive before trace.""" + # 1. Log arrives first + log_props = {"$ai_input": [{"role": "user", "content": "Hello"}], "$ai_trace_id": "trace123"} + result = cache_and_merge_properties("trace123", "span456", log_props, is_trace=False) + + assert result is None, "First arrival (log) should cache and return None" + assert "otel_merge:logs:trace123:span456" in mock_redis.cache, "Log should be cached" + + # 2. Trace arrives second + trace_props = {"$ai_model": "gpt-4", "$ai_input_tokens": 10, "$ai_output_tokens": 20} + result = cache_and_merge_properties("trace123", "span456", trace_props, is_trace=True) + + assert result is not None, "Second arrival (trace) should return merged properties" + assert "$ai_input" in result, "Should include log content" + assert "$ai_model" in result, "Should include trace metadata" + assert result["$ai_input_tokens"] == 10, "Should include trace tokens" + assert "otel_merge:logs:trace123:span456" not in mock_redis.cache, "Cache should be cleaned up" + + +def test_trace_first_then_logs(mock_redis): + """Test scenario: trace arrives before logs.""" + # 1. Trace arrives first + trace_props = {"$ai_model": "gpt-4", "$ai_input_tokens": 10, "$ai_output_tokens": 20} + result = cache_and_merge_properties("trace789", "span012", trace_props, is_trace=True) + + assert result is None, "First arrival (trace) should cache and return None" + assert "otel_merge:trace:trace789:span012" in mock_redis.cache, "Trace should be cached" + + # 2. Log arrives second + log_props = {"$ai_input": [{"role": "user", "content": "Hello"}]} + result = cache_and_merge_properties("trace789", "span012", log_props, is_trace=False) + + assert result is not None, "Second arrival (log) should return merged properties" + assert "$ai_input" in result, "Should include log content" + assert "$ai_model" in result, "Should include trace metadata" + assert result["$ai_input_tokens"] == 10, "Should include trace tokens" + assert "otel_merge:trace:trace789:span012" not in mock_redis.cache, "Cache should be cleaned up" + + +def test_multiple_logs_accumulation(mock_redis): + """Test that multiple logs accumulate before merging with trace.""" + # 1. First log arrives + log1_props = {"$ai_input": [{"role": "user", "content": "Hello"}]} + result = cache_and_merge_properties("trace111", "span222", log1_props, is_trace=False) + + assert result is None, "First log should cache and return None" + cached_logs = json.loads(mock_redis.cache["otel_merge:logs:trace111:span222"]) + assert "$ai_input" in cached_logs + + # 2. Second log arrives (assistant response) + log2_props = {"$ai_output_choices": [{"role": "assistant", "content": "Hi there"}]} + result = cache_and_merge_properties("trace111", "span222", log2_props, is_trace=False) + + assert result is None, "Second log should accumulate and return None (no trace yet)" + cached_logs = json.loads(mock_redis.cache["otel_merge:logs:trace111:span222"]) + assert "$ai_input" in cached_logs, "Should have first log content" + assert "$ai_output_choices" in cached_logs, "Should have second log content" + + # 3. Trace arrives - merges with accumulated logs + trace_props = {"$ai_model": "gpt-4", "$ai_input_tokens": 10, "$ai_output_tokens": 20} + result = cache_and_merge_properties("trace111", "span222", trace_props, is_trace=True) + + assert result is not None, "Trace should return merged properties with accumulated logs" + assert "$ai_input" in result, "Should include first log" + assert "$ai_output_choices" in result, "Should include second log" + assert "$ai_model" in result, "Should include trace metadata" + assert "otel_merge:logs:trace111:span222" not in mock_redis.cache, "Logs cache cleaned up" + assert "otel_merge:trace:trace111:span222" not in mock_redis.cache, "Trace cache cleaned up" + + +def test_property_precedence(mock_redis): + """Test that trace properties override log properties on conflicts.""" + # 1. Log with model info + log_props = {"$ai_model": "gpt-3.5", "$ai_input": [{"role": "user", "content": "Hello"}]} + cache_and_merge_properties("trace333", "span444", log_props, is_trace=False) + + # 2. Trace with different model (should override) + trace_props = {"$ai_model": "gpt-4", "$ai_input_tokens": 10} + result = cache_and_merge_properties("trace333", "span444", trace_props, is_trace=True) + + assert result["$ai_model"] == "gpt-4", "Trace properties should override log properties" + assert "$ai_input" in result, "Should keep non-conflicting log properties" + assert result["$ai_input_tokens"] == 10, "Should include trace-only properties" + + +def test_redis_error_fallback(mock_redis): + """Test that Redis errors fall back to immediate send.""" + # Simulate Redis error + mock_redis.get.side_effect = Exception("Redis connection failed") + + trace_props = {"$ai_model": "gpt-4", "$ai_input_tokens": 10} + result = cache_and_merge_properties("trace555", "span666", trace_props, is_trace=True) + + # Should return properties immediately on error (no merging) + assert result is not None, "Should fallback to immediate send on Redis error" + assert result == trace_props, "Should return original properties unchanged" + + +def test_cache_keys_are_separate(mock_redis): + """Test that trace and log caches use separate key namespaces.""" + # Cache a trace for span1 + trace_props = {"$ai_model": "gpt-4"} + result = cache_and_merge_properties("trace777", "span888", trace_props, is_trace=True) + + assert result is None, "First arrival should cache" + assert "otel_merge:trace:trace777:span888" in mock_redis.cache + + # Cache a log for different span (span2) - should not merge with span1 + log_props = {"$ai_input": [{"role": "user", "content": "Hello"}]} + result = cache_and_merge_properties("trace777", "span999", log_props, is_trace=False) + + assert result is None, "Log for different span should cache separately" + assert "otel_merge:logs:trace777:span999" in mock_redis.cache + assert "otel_merge:trace:trace777:span888" in mock_redis.cache, "Original trace still cached" + + +def test_ttl_is_set(mock_redis): + """Test that cached entries have TTL set.""" + trace_props = {"$ai_model": "gpt-4"} + cache_and_merge_properties("trace999", "span000", trace_props, is_trace=True) + + # Verify setex was called with TTL (60 seconds) + mock_redis.setex.assert_called() + call_args = mock_redis.setex.call_args + assert call_args[0][1] == 60, "TTL should be 60 seconds" + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/products/llm_analytics/backend/api/otel/test_ingestion_parity.py b/products/llm_analytics/backend/api/otel/test_ingestion_parity.py new file mode 100644 index 000000000000..df4544b881d8 --- /dev/null +++ b/products/llm_analytics/backend/api/otel/test_ingestion_parity.py @@ -0,0 +1,554 @@ +""" +Integration tests for ingestion parity between SDK, OTEL v1, and OTEL v2. + +Validates that all three ingestion methods produce structurally equivalent +AI events with the same core properties. The goal is to ensure that +regardless of how data enters PostHog (SDK, OTEL v1, OTEL v2), the resulting +events are consistent and comparable. + +Expected differences (acceptable): +- trace_id format: SDK uses UUIDs, OTEL uses hex strings +- tool_calls structure: SDK and v2 use nested arrays, v1 uses flattened keys +- output structure: SDK may wrap content differently + +Required parity (must match): +- $ai_input contains full conversation history +- $ai_output_choices contains assistant response +- $ai_model, $ai_provider, $ai_input_tokens, $ai_output_tokens present +- All messages from multi-turn conversation preserved +""" + +import pytest +from unittest.mock import patch + +from parameterized import parameterized + +from products.llm_analytics.backend.api.otel.ingestion import transform_logs_to_ai_events +from products.llm_analytics.backend.api.otel.transformer import transform_span_to_ai_event + + +def create_v1_span_with_conversation( + trace_id: str = "a6b23ecb43aa99ff43ff70948b0a377f", + span_id: str = "fee4e58c7137b7ef", + model: str = "gpt-4o-mini", + input_tokens: int = 264, + output_tokens: int = 25, +) -> tuple[dict, dict, dict]: + """ + Create a v1 OTEL span with multi-turn conversation in indexed attributes. + + Based on actual opentelemetry.instrumentation.openai.v1 output: + - Messages in gen_ai.prompt.{i}.role/content + - Tool calls in gen_ai.prompt.{i}.tool_calls.{j}.id/name/arguments + - Tool responses have tool_call_id + - Functions in llm.request.functions.{i}.name/description/parameters + - Completions in gen_ai.completion.{i}.role/content/finish_reason + """ + # Build attributes matching real v1 instrumentation format + attributes = { + # Request metadata + "llm.request.type": "chat", + "gen_ai.system": "openai", + "gen_ai.request.model": model, + "gen_ai.request.max_tokens": 100, + "llm.headers": "None", + "llm.is_streaming": False, + "gen_ai.openai.api_base": "https://api.openai.com/v1/", + # Conversation messages (indexed) + "gen_ai.prompt.0.role": "system", + "gen_ai.prompt.0.content": "You are a helpful assistant.", + "gen_ai.prompt.1.role": "user", + "gen_ai.prompt.1.content": "Hi there!", + "gen_ai.prompt.2.role": "assistant", + "gen_ai.prompt.2.content": "Hello! How can I help?", + "gen_ai.prompt.3.role": "user", + "gen_ai.prompt.3.content": "What's the weather?", + # Assistant tool call (no content, has tool_calls) + "gen_ai.prompt.4.role": "assistant", + "gen_ai.prompt.4.tool_calls.0.id": "call_abc123", + "gen_ai.prompt.4.tool_calls.0.name": "get_weather", + "gen_ai.prompt.4.tool_calls.0.arguments": '{"location":"Paris"}', + # Tool response + "gen_ai.prompt.5.role": "tool", + "gen_ai.prompt.5.content": "Sunny, 18°C", + "gen_ai.prompt.5.tool_call_id": "call_abc123", + # Continued conversation + "gen_ai.prompt.6.role": "assistant", + "gen_ai.prompt.6.content": "The weather is sunny at 18°C.", + "gen_ai.prompt.7.role": "user", + "gen_ai.prompt.7.content": "Thanks, bye!", + # Tool definitions + "llm.request.functions.0.name": "get_weather", + "llm.request.functions.0.description": "Get weather for a location", + "llm.request.functions.0.parameters": '{"type": "object", "properties": {"location": {"type": "string"}}, "required": ["location"]}', + # Response metadata + "gen_ai.response.model": "gpt-4o-mini-2024-07-18", + "gen_ai.response.id": "chatcmpl-test123", + "llm.usage.total_tokens": input_tokens + output_tokens, + "gen_ai.usage.input_tokens": input_tokens, + "gen_ai.usage.output_tokens": output_tokens, + # Completion + "gen_ai.completion.0.finish_reason": "stop", + "gen_ai.completion.0.role": "assistant", + "gen_ai.completion.0.content": "You're welcome! Goodbye!", + } + + span = { + "trace_id": trace_id, + "span_id": span_id, + "parent_span_id": None, + "name": "openai.chat", + "kind": 3, + "start_time_unix_nano": 1700000000000000000, + "end_time_unix_nano": 1700000001000000000, + "attributes": attributes, + "status": {"code": 1}, + } + + resource = {"service.name": "test-service"} + # Match actual scope name from real instrumentation + scope = {"name": "opentelemetry.instrumentation.openai.v1", "version": "0.40.0"} + + return span, resource, scope + + +def create_v2_logs_with_conversation( + trace_id: str = "af4e25c0d86a2f7bebd2e0c84f072499", + span_id: str = "a19561fe0a9d2d73", +) -> list[dict]: + """ + Create v2 OTEL logs request with multi-turn conversation. + + Based on actual opentelemetry.instrumentation.openai_v2 output: + - Each log has attributes with gen_ai.system and event.name + - Body contains content directly (not nested in message for input) + - gen_ai.choice logs have message wrapper with role/content + finish_reason + - Event names: gen_ai.system.message, gen_ai.user.message, gen_ai.choice + + Returns a list of dicts, each containing: + - log: parsed log record dict + - resource: dict of resource attributes for this log + - scope: dict of instrumentation scope info for this log + """ + base_time = 1700000000000000000 + resource = {"service.name": "test-service"} + # Match actual scope name from real instrumentation + scope = {"name": "opentelemetry.instrumentation.openai_v2", "version": "2.0.0"} + + logs = [ + # System message + { + "trace_id": trace_id, + "span_id": span_id, + "attributes": { + "gen_ai.system": "openai", + "event.name": "gen_ai.system.message", + }, + "body": {"content": "You are a helpful assistant."}, + "time_unix_nano": base_time, + }, + # User message 1 + { + "trace_id": trace_id, + "span_id": span_id, + "attributes": { + "gen_ai.system": "openai", + "event.name": "gen_ai.user.message", + }, + "body": {"content": "Hi there!"}, + "time_unix_nano": base_time + 1000000, + }, + # Assistant message (from previous turn, now in context) + { + "trace_id": trace_id, + "span_id": span_id, + "attributes": { + "gen_ai.system": "openai", + "event.name": "gen_ai.assistant.message", + }, + "body": {"content": "Hello! How can I help?"}, + "time_unix_nano": base_time + 2000000, + }, + # User message 2 + { + "trace_id": trace_id, + "span_id": span_id, + "attributes": { + "gen_ai.system": "openai", + "event.name": "gen_ai.user.message", + }, + "body": {"content": "What's the weather?"}, + "time_unix_nano": base_time + 3000000, + }, + # Assistant tool call + { + "trace_id": trace_id, + "span_id": span_id, + "attributes": { + "gen_ai.system": "openai", + "event.name": "gen_ai.assistant.message", + }, + "body": { + "tool_calls": [ + { + "id": "call_123", + "type": "function", + "function": {"name": "get_weather", "arguments": '{"location":"Paris"}'}, + } + ], + }, + "time_unix_nano": base_time + 4000000, + }, + # Tool response + { + "trace_id": trace_id, + "span_id": span_id, + "attributes": { + "gen_ai.system": "openai", + "event.name": "gen_ai.tool.message", + }, + "body": {"content": "Sunny, 18°C", "id": "call_123"}, + "time_unix_nano": base_time + 5000000, + }, + # Assistant message after tool + { + "trace_id": trace_id, + "span_id": span_id, + "attributes": { + "gen_ai.system": "openai", + "event.name": "gen_ai.assistant.message", + }, + "body": {"content": "The weather is sunny at 18°C."}, + "time_unix_nano": base_time + 6000000, + }, + # User message 3 + { + "trace_id": trace_id, + "span_id": span_id, + "attributes": { + "gen_ai.system": "openai", + "event.name": "gen_ai.user.message", + }, + "body": {"content": "Thanks, bye!"}, + "time_unix_nano": base_time + 7000000, + }, + # Final choice/completion - note the different structure + { + "trace_id": trace_id, + "span_id": span_id, + "attributes": { + "gen_ai.system": "openai", + "event.name": "gen_ai.choice", + }, + "body": { + "index": 0, + "finish_reason": "stop", + "message": {"role": "assistant", "content": "You're welcome! Goodbye!"}, + }, + "time_unix_nano": base_time + 8000000, + }, + ] + + # Return in per-item format with resource/scope context + return [{"log": log, "resource": resource, "scope": scope} for log in logs] + + +def create_v2_span_metadata( + trace_id: str = "af4e25c0d86a2f7bebd2e0c84f072499", + span_id: str = "a19561fe0a9d2d73", + model: str = "gpt-4o-mini", + input_tokens: int = 234, + output_tokens: int = 18, +) -> tuple[dict, dict, dict]: + """ + Create v2 OTEL span (metadata only, no content). + + In v2 instrumentation, spans contain only metadata (model, tokens, etc.) + while message content is sent via separate log records. + """ + span = { + "trace_id": trace_id, + "span_id": span_id, + "parent_span_id": None, + "name": "chat gpt-4o-mini", + "kind": 3, + "start_time_unix_nano": 1700000000000000000, + "end_time_unix_nano": 1700000001000000000, + "attributes": { + "gen_ai.system": "openai", + "gen_ai.request.model": model, + "gen_ai.operation.name": "chat", + "gen_ai.usage.input_tokens": input_tokens, + "gen_ai.usage.output_tokens": output_tokens, + }, + "status": {"code": 1}, + } + + resource = {"service.name": "test-service"} + # Match actual scope name from real v2 instrumentation + scope = {"name": "opentelemetry.instrumentation.openai_v2", "version": "2.0.0"} + + return span, resource, scope + + +class TestV1SpanTransformation: + """Tests for OTEL v1 span transformation.""" + + def test_v1_span_produces_ai_generation_event(self): + """v1 span with conversation should produce $ai_generation event.""" + span, resource, scope = create_v1_span_with_conversation() + + event = transform_span_to_ai_event(span, resource, scope) + + assert event is not None + assert event["event"] == "$ai_generation" + + def test_v1_span_contains_full_conversation_history(self): + """v1 span $ai_input should contain all conversation messages.""" + span, resource, scope = create_v1_span_with_conversation() + + event = transform_span_to_ai_event(span, resource, scope) + + assert event is not None + props = event["properties"] + assert "$ai_input" in props + assert len(props["$ai_input"]) == 8 + + def test_v1_span_contains_output_choices(self): + """v1 span $ai_output_choices should contain assistant response.""" + span, resource, scope = create_v1_span_with_conversation() + + event = transform_span_to_ai_event(span, resource, scope) + + assert event is not None + props = event["properties"] + assert "$ai_output_choices" in props + assert len(props["$ai_output_choices"]) == 1 + assert props["$ai_output_choices"][0]["content"] == "You're welcome! Goodbye!" + + def test_v1_span_contains_model_metadata(self): + """v1 span should contain model, provider, and token counts.""" + span, resource, scope = create_v1_span_with_conversation() + + event = transform_span_to_ai_event(span, resource, scope) + + assert event is not None + props = event["properties"] + assert props["$ai_model"] == "gpt-4o-mini" + assert props["$ai_provider"] == "openai" + assert props["$ai_input_tokens"] == 264 + assert props["$ai_output_tokens"] == 25 + + def test_v1_span_preserves_trace_context(self): + """v1 span should preserve trace_id and span_id.""" + span, resource, scope = create_v1_span_with_conversation() + + event = transform_span_to_ai_event(span, resource, scope) + + assert event is not None + props = event["properties"] + assert props["$ai_trace_id"] == "a6b23ecb43aa99ff43ff70948b0a377f" + assert props["$ai_span_id"] == "fee4e58c7137b7ef" + + +class TestV2LogsAccumulation: + """Tests for OTEL v2 logs accumulation.""" + + def test_v2_logs_accumulate_conversation_history(self): + """v2 logs should accumulate full conversation in $ai_input.""" + parsed_request = create_v2_logs_with_conversation() + + with patch("products.llm_analytics.backend.api.otel.event_merger.cache_and_merge_properties") as mock_merger: + + def return_accumulated(trace_id, span_id, props, is_trace): + if "$ai_input" in props and len(props["$ai_input"]) >= 7: + return props + return None + + mock_merger.side_effect = return_accumulated + + _events = transform_logs_to_ai_events(parsed_request) + + assert mock_merger.call_count == 1 + call_args = mock_merger.call_args + props = call_args[0][2] + + assert "$ai_input" in props + assert len(props["$ai_input"]) >= 7 + + def test_v2_logs_preserve_message_order(self): + """v2 logs should preserve chronological message order.""" + parsed_request = create_v2_logs_with_conversation() + + with patch("products.llm_analytics.backend.api.otel.event_merger.cache_and_merge_properties") as mock_merger: + + def return_accumulated(trace_id, span_id, props, is_trace): + if "$ai_input" in props: + return props + return None + + mock_merger.side_effect = return_accumulated + + _events = transform_logs_to_ai_events(parsed_request) + + props = mock_merger.call_args[0][2] + + assert props["$ai_input"][0]["role"] == "system" + assert props["$ai_input"][1]["role"] == "user" + assert props["$ai_input"][1]["content"] == "Hi there!" + + def test_v2_logs_contain_output_choices(self): + """v2 logs should contain final response in $ai_output_choices.""" + parsed_request = create_v2_logs_with_conversation() + + with patch("products.llm_analytics.backend.api.otel.event_merger.cache_and_merge_properties") as mock_merger: + + def return_accumulated(trace_id, span_id, props, is_trace): + if "$ai_output_choices" in props: + return props + return None + + mock_merger.side_effect = return_accumulated + + _events = transform_logs_to_ai_events(parsed_request) + + props = mock_merger.call_args[0][2] + + assert "$ai_output_choices" in props + assert len(props["$ai_output_choices"]) == 1 + assert props["$ai_output_choices"][0]["content"] == "You're welcome! Goodbye!" + + +class TestIngestionParity: + """Tests for parity between SDK-style, OTEL v1, and OTEL v2 events.""" + + @parameterized.expand( + [ + ("model", "$ai_model"), + ("provider", "$ai_provider"), + ("input_tokens", "$ai_input_tokens"), + ("output_tokens", "$ai_output_tokens"), + ] + ) + def test_v1_contains_required_property(self, name: str, property_key: str): + """v1 events must contain core properties.""" + span, resource, scope = create_v1_span_with_conversation() + event = transform_span_to_ai_event(span, resource, scope) + + assert event is not None + assert property_key in event["properties"], f"Missing {property_key}" + + def test_v1_and_v2_produce_same_message_count(self): + """v1 and v2 should produce same number of input messages.""" + v1_span, v1_resource, v1_scope = create_v1_span_with_conversation() + v1_event = transform_span_to_ai_event(v1_span, v1_resource, v1_scope) + + v2_logs = create_v2_logs_with_conversation() + + with patch("products.llm_analytics.backend.api.otel.event_merger.cache_and_merge_properties") as mock_merger: + + def return_accumulated(trace_id, span_id, props, is_trace): + if "$ai_input" in props: + return props + return None + + mock_merger.side_effect = return_accumulated + _events = transform_logs_to_ai_events(v2_logs) + v2_props = mock_merger.call_args[0][2] + + v1_input = v1_event["properties"]["$ai_input"] + v2_input = v2_props["$ai_input"] + + assert len(v1_input) == len(v2_input), f"Message count mismatch: v1={len(v1_input)}, v2={len(v2_input)}" + + def test_v1_and_v2_have_same_output_structure(self): + """v1 and v2 should both have $ai_output_choices as array.""" + v1_span, v1_resource, v1_scope = create_v1_span_with_conversation() + v1_event = transform_span_to_ai_event(v1_span, v1_resource, v1_scope) + + v2_logs = create_v2_logs_with_conversation() + + with patch("products.llm_analytics.backend.api.otel.event_merger.cache_and_merge_properties") as mock_merger: + + def return_accumulated(trace_id, span_id, props, is_trace): + if "$ai_output_choices" in props: + return props + return None + + mock_merger.side_effect = return_accumulated + _events = transform_logs_to_ai_events(v2_logs) + v2_props = mock_merger.call_args[0][2] + + v1_output = v1_event["properties"]["$ai_output_choices"] + v2_output = v2_props["$ai_output_choices"] + + assert isinstance(v1_output, list), "v1 output should be list" + assert isinstance(v2_output, list), "v2 output should be list" + assert len(v1_output) == len(v2_output) == 1 + + +class TestToolCallParity: + """Tests for tool call handling parity.""" + + def test_v1_preserves_tool_messages(self): + """v1 should preserve tool messages in conversation.""" + span, resource, scope = create_v1_span_with_conversation() + event = transform_span_to_ai_event(span, resource, scope) + + props = event["properties"] + roles = [msg.get("role") for msg in props["$ai_input"]] + + assert "tool" in roles, "v1 should preserve tool messages" + + def test_v2_preserves_tool_messages(self): + """v2 should preserve tool messages in conversation.""" + parsed_request = create_v2_logs_with_conversation() + + with patch("products.llm_analytics.backend.api.otel.event_merger.cache_and_merge_properties") as mock_merger: + + def return_accumulated(trace_id, span_id, props, is_trace): + if "$ai_input" in props: + return props + return None + + mock_merger.side_effect = return_accumulated + _events = transform_logs_to_ai_events(parsed_request) + props = mock_merger.call_args[0][2] + + roles = [msg.get("role") for msg in props["$ai_input"]] + assert "tool" in roles, "v2 should preserve tool messages" + + +class TestEventTypeConsistency: + """Tests for event type determination consistency.""" + + def test_v1_generation_span_is_ai_generation(self): + """v1 span with LLM attrs should be $ai_generation.""" + span, resource, scope = create_v1_span_with_conversation() + event = transform_span_to_ai_event(span, resource, scope) + + assert event["event"] == "$ai_generation" + + def test_v2_merged_event_can_be_ai_generation(self): + """v2 merged event with LLM attrs should be $ai_generation.""" + span, resource, scope = create_v2_span_metadata() + + mock_merged_props = { + "$ai_model": "gpt-4o-mini", + "$ai_provider": "openai", + "$ai_input_tokens": 234, + "$ai_output_tokens": 18, + "$ai_input": [{"role": "user", "content": "Hello"}], + "$ai_output_choices": [{"role": "assistant", "content": "Hi!"}], + } + + with patch("products.llm_analytics.backend.api.otel.transformer.cache_and_merge_properties") as mock_merger: + mock_merger.return_value = mock_merged_props + event = transform_span_to_ai_event(span, resource, scope) + + assert event is not None + assert event["event"] == "$ai_generation" + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/products/llm_analytics/backend/api/otel/test_logs_accumulation.py b/products/llm_analytics/backend/api/otel/test_logs_accumulation.py new file mode 100644 index 000000000000..017809fa28bb --- /dev/null +++ b/products/llm_analytics/backend/api/otel/test_logs_accumulation.py @@ -0,0 +1,303 @@ +""" +Unit tests for v2 logs accumulation in ingestion.py. + +Tests that multiple log events for the same span correctly accumulate +message arrays instead of overwriting them. +""" + +import pytest +from unittest.mock import patch + +from products.llm_analytics.backend.api.otel.ingestion import transform_logs_to_ai_events + + +def _make_parsed_items(logs: list[dict], resource: dict, scope: dict) -> list[dict]: + """Helper to convert old format to new per-item format.""" + return [{"log": log, "resource": resource, "scope": scope} for log in logs] + + +def test_multiple_user_messages_accumulate(): + """Test that multiple user messages accumulate into $ai_input array.""" + # Simulate multiple log records for same span with different user messages + logs = [ + { + "trace_id": "trace123", + "span_id": "span456", + "attributes": {"event.name": "gen_ai.user.message"}, + "body": {"content": "hi there"}, + "time_unix_nano": 1000000000, + }, + { + "trace_id": "trace123", + "span_id": "span456", + "attributes": {"event.name": "gen_ai.user.message"}, + "body": {"content": "k bye"}, + "time_unix_nano": 2000000000, + }, + ] + resource = {"service.name": "test-service"} + scope = {"name": "test-scope"} + parsed_items = _make_parsed_items(logs, resource, scope) + + # Mock event merger to return merged properties on second call + with patch("products.llm_analytics.backend.api.otel.event_merger.cache_and_merge_properties") as mock_merger: + # First log: cache (return None) + # Second log: return merged (should have both messages) + def merger_side_effect(trace_id, span_id, props, is_trace): + # Check that props contains both messages on second call + if "$ai_input" in props and len(props["$ai_input"]) == 2: + return props # Return accumulated properties + return None # First arrival, cache + + mock_merger.side_effect = merger_side_effect + + _events = transform_logs_to_ai_events(parsed_items) + + # Verify accumulation happened before calling merger + # The merger should have been called once with both messages accumulated + assert mock_merger.call_count == 1 + call_args = mock_merger.call_args + props = call_args[0][2] # Third argument is properties + assert "$ai_input" in props + assert len(props["$ai_input"]) == 2 + assert props["$ai_input"][0]["content"] == "hi there" + assert props["$ai_input"][1]["content"] == "k bye" + + +def test_user_and_assistant_messages_accumulate(): + """Test that conversation history (including assistant messages) goes into $ai_input.""" + logs = [ + { + "trace_id": "trace789", + "span_id": "span012", + "attributes": {"event.name": "gen_ai.user.message"}, + "body": {"content": "hello"}, + "time_unix_nano": 1000000000, + }, + { + "trace_id": "trace789", + "span_id": "span012", + "attributes": {"event.name": "gen_ai.assistant.message"}, + "body": {"content": "hi there!"}, + "time_unix_nano": 2000000000, + }, + { + "trace_id": "trace789", + "span_id": "span012", + "attributes": {"event.name": "gen_ai.user.message"}, + "body": {"content": "tell me a joke"}, + "time_unix_nano": 3000000000, + }, + { + "trace_id": "trace789", + "span_id": "span012", + "attributes": {"event.name": "gen_ai.choice"}, + "body": { + "message": {"role": "assistant", "content": "Why did the chicken cross the road?"}, + "finish_reason": "stop", + }, + "time_unix_nano": 4000000000, + }, + ] + resource = {"service.name": "test-service"} + scope = {"name": "test-scope"} + parsed_items = _make_parsed_items(logs, resource, scope) + + with patch("products.llm_analytics.backend.api.otel.event_merger.cache_and_merge_properties") as mock_merger: + + def merger_side_effect(trace_id, span_id, props, is_trace): + # Return accumulated properties if we have both input and output + if "$ai_input" in props and "$ai_output_choices" in props: + return props + return None + + mock_merger.side_effect = merger_side_effect + + _events = transform_logs_to_ai_events(parsed_items) + + # Verify conversation history (user + assistant) accumulated in $ai_input + # and only current response in $ai_output_choices + assert mock_merger.call_count == 1 + call_args = mock_merger.call_args + props = call_args[0][2] + assert "$ai_input" in props + assert "$ai_output_choices" in props + # $ai_input should have: user1, assistant1, user2 (conversation context) + assert len(props["$ai_input"]) == 3 + assert props["$ai_input"][0]["content"] == "hello" + assert props["$ai_input"][1]["content"] == "hi there!" + assert props["$ai_input"][2]["content"] == "tell me a joke" + # $ai_output_choices should have only current response + assert len(props["$ai_output_choices"]) == 1 + assert props["$ai_output_choices"][0]["content"] == "Why did the chicken cross the road?" + + +def test_tool_messages_accumulate(): + """Test that tool messages are properly handled and accumulate in conversation history.""" + logs = [ + { + "trace_id": "trace999", + "span_id": "span888", + "attributes": {"event.name": "gen_ai.user.message"}, + "body": {"content": "What's the weather in Paris?"}, + "time_unix_nano": 1000000000, + }, + { + "trace_id": "trace999", + "span_id": "span888", + "attributes": {"event.name": "gen_ai.assistant.message"}, + "body": { + "content": None, + "tool_calls": [ + { + "id": "call_123", + "type": "function", + "function": {"name": "get_weather", "arguments": '{"location":"Paris"}'}, + } + ], + }, + "time_unix_nano": 2000000000, + }, + { + "trace_id": "trace999", + "span_id": "span888", + "attributes": {"event.name": "gen_ai.tool.message"}, + "body": {"content": "Sunny, 18°C", "id": "call_123"}, + "time_unix_nano": 3000000000, + }, + { + "trace_id": "trace999", + "span_id": "span888", + "attributes": {"event.name": "gen_ai.choice"}, + "body": { + "message": {"role": "assistant", "content": "The weather in Paris is sunny with 18°C."}, + "finish_reason": "stop", + }, + "time_unix_nano": 4000000000, + }, + ] + resource = {"service.name": "test-service"} + scope = {"name": "test-scope"} + parsed_items = _make_parsed_items(logs, resource, scope) + + with patch("products.llm_analytics.backend.api.otel.event_merger.cache_and_merge_properties") as mock_merger: + + def merger_side_effect(trace_id, span_id, props, is_trace): + # Return accumulated properties once we have all messages + if "$ai_input" in props and "$ai_output_choices" in props and len(props["$ai_input"]) >= 3: + return props + return None + + mock_merger.side_effect = merger_side_effect + + _events = transform_logs_to_ai_events(parsed_items) + + # Verify tool message was properly accumulated + assert mock_merger.call_count == 1 + call_args = mock_merger.call_args + props = call_args[0][2] + + # $ai_input should have: user, assistant (with tool_calls), tool + assert "$ai_input" in props + assert len(props["$ai_input"]) == 3 + + # Verify user message + assert props["$ai_input"][0]["role"] == "user" + assert props["$ai_input"][0]["content"] == "What's the weather in Paris?" + + # Verify assistant message with tool_calls + assert props["$ai_input"][1]["role"] == "assistant" + assert "tool_calls" in props["$ai_input"][1] + assert props["$ai_input"][1]["tool_calls"][0]["id"] == "call_123" + + # Verify tool message with tool_call_id + assert props["$ai_input"][2]["role"] == "tool" + assert props["$ai_input"][2]["content"] == "Sunny, 18°C" + assert props["$ai_input"][2]["tool_call_id"] == "call_123" + + # Verify final response in output + assert "$ai_output_choices" in props + assert len(props["$ai_output_choices"]) == 1 + assert props["$ai_output_choices"][0]["content"] == "The weather in Paris is sunny with 18°C." + + +def test_non_array_properties_are_overwritten(): + """Test that non-array properties use last-value-wins behavior.""" + logs = [ + { + "trace_id": "trace111", + "span_id": "span222", + "attributes": { + "event.name": "gen_ai.user.message", + "gen_ai.request.model": "gpt-3.5", + "gen_ai.usage.input_tokens": 10, + }, + "body": {"content": "hello"}, + "time_unix_nano": 1000000000, + }, + { + "trace_id": "trace111", + "span_id": "span222", + "attributes": { + "event.name": "gen_ai.choice", + "gen_ai.response.model": "gpt-4", # Different model in response + "gen_ai.usage.output_tokens": 20, + }, + "body": {"message": {"role": "assistant", "content": "hi"}, "finish_reason": "stop"}, + "time_unix_nano": 2000000000, + }, + ] + resource = {"service.name": "test-service"} + scope = {"name": "test-scope"} + parsed_items = _make_parsed_items(logs, resource, scope) + + with patch("products.llm_analytics.backend.api.otel.event_merger.cache_and_merge_properties") as mock_merger: + + def merger_side_effect(trace_id, span_id, props, is_trace): + if "$ai_input" in props and "$ai_output_choices" in props: + return props + return None + + mock_merger.side_effect = merger_side_effect + + _events = transform_logs_to_ai_events(parsed_items) + + # Verify non-array properties were overwritten (last value wins) + call_args = mock_merger.call_args + props = call_args[0][2] + assert props["$ai_model"] == "gpt-4" # Second log's model wins + assert props["$ai_input_tokens"] == 10 # From first log + assert props["$ai_output_tokens"] == 20 # From second log + + +def test_single_log_event_works(): + """Test that single log events still work (no accumulation needed).""" + logs = [ + { + "trace_id": "trace333", + "span_id": "span444", + "attributes": {"event.name": "gen_ai.user.message"}, + "body": {"content": "single message"}, + "time_unix_nano": 1000000000, + } + ] + resource = {"service.name": "test-service"} + scope = {"name": "test-scope"} + parsed_items = _make_parsed_items(logs, resource, scope) + + with patch("products.llm_analytics.backend.api.otel.event_merger.cache_and_merge_properties") as mock_merger: + mock_merger.return_value = None # First arrival, cache + + _events = transform_logs_to_ai_events(parsed_items) + + # Verify single message was processed + assert mock_merger.call_count == 1 + call_args = mock_merger.call_args + props = call_args[0][2] + assert "$ai_input" in props + assert len(props["$ai_input"]) == 1 + assert props["$ai_input"][0]["content"] == "single message" + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/products/llm_analytics/backend/api/otel/transformer.py b/products/llm_analytics/backend/api/otel/transformer.py new file mode 100644 index 000000000000..4824b3651feb --- /dev/null +++ b/products/llm_analytics/backend/api/otel/transformer.py @@ -0,0 +1,426 @@ +""" +Core OTel span to PostHog AI event transformer. + +Transforms OpenTelemetry spans into PostHog AI events using a waterfall +pattern for attribute extraction: +1. PostHog native attributes (highest priority) +2. GenAI semantic conventions (fallback) +3. OTel span built-ins (trace_id, span_id, etc.) +""" + +import json +from datetime import UTC, datetime +from typing import Any + +from .conventions.genai import detect_provider, extract_genai_attributes +from .conventions.posthog_native import extract_posthog_native_attributes +from .conventions.providers import OtelInstrumentationPattern +from .event_merger import cache_and_merge_properties + +OTEL_TRANSFORMER_VERSION = "1.0.0" + +# Span status codes (from OpenTelemetry spec) +SPAN_STATUS_UNSET = 0 +SPAN_STATUS_OK = 1 +SPAN_STATUS_ERROR = 2 + + +def transform_span_to_ai_event( + span: dict[str, Any], + resource: dict[str, Any], + scope: dict[str, Any], + baggage: dict[str, str] | None = None, +) -> dict[str, Any] | None: + """ + Transform a single OTel span to PostHog AI event. + + Args: + span: Parsed OTel span + resource: Resource attributes (service.name, etc.) + scope: Instrumentation scope info + baggage: Baggage context (session_id, etc.) + + Returns: + PostHog AI event dict OR None if this is first arrival (cached, waiting for logs): + - event: Event type ($ai_generation, $ai_span, etc.) + - distinct_id: User identifier + - timestamp: ISO 8601 timestamp + - properties: AI event properties + - uuid: Event UUID (for deduplication with log events) + """ + baggage = baggage or {} + + # Extract attributes using waterfall pattern + posthog_attrs = extract_posthog_native_attributes(span) + genai_attrs = extract_genai_attributes(span, scope) + + # Merge with precedence: PostHog > GenAI + merged_attrs = {**genai_attrs, **posthog_attrs} + + # Build AI event properties + properties = build_event_properties(span, merged_attrs, resource, scope, baggage) + + # Detect instrumentation pattern to determine event routing: + # + # V1_ATTRIBUTES: Everything in span attributes - send immediately + # V2_TRACES_AND_LOGS: Metadata in span, content in logs - use event merger + # + # Detection priority: + # 1. Provider declares pattern via get_instrumentation_pattern() - most reliable + # 2. Span has prompt/completion attributes - indicates V1 data present + # 3. Span is embedding operation - embeddings don't have associated logs + # 4. Default to V2 (safer - waits for logs rather than sending incomplete) + provider = detect_provider(span, scope) + provider_pattern = provider.get_instrumentation_pattern() if provider else None + has_v1_content = bool(merged_attrs.get("prompt") or merged_attrs.get("completion")) + + # Embedding spans are self-contained - they don't have prompt/completion logs + op_name = merged_attrs.get("operation_name", "").lower() + is_embedding = op_name in ("embedding", "embeddings") + + uses_v1_pattern = provider_pattern == OtelInstrumentationPattern.V1_ATTRIBUTES or has_v1_content or is_embedding + + if not uses_v1_pattern: + # V2 instrumentation - use event merger for bidirectional merge with logs + trace_id = span.get("trace_id", "") + span_id = span.get("span_id", "") + if trace_id and span_id: + merged = cache_and_merge_properties(trace_id, span_id, properties, is_trace=True) + if merged is None: + # This is first arrival - trace cached, waiting for logs + # Don't send this event yet + return None + # Second arrival - logs already cached, merged contains complete event + properties = merged + # else: v1 span has everything - send immediately without merging + + # Determine event type + event_type = determine_event_type(span, merged_attrs, scope) + + # Calculate timestamp + timestamp = calculate_timestamp(span) + + # Get distinct_id + distinct_id = extract_distinct_id(resource, baggage) + + # Generate consistent UUID from trace_id + span_id for deduplication + # This allows log events and trace events for the same span to merge + import uuid + + trace_id = span.get("trace_id", "") + span_id = span.get("span_id", "") + event_uuid = None + if trace_id and span_id: + # Create deterministic UUID from trace_id + span_id + namespace = uuid.UUID("00000000-0000-0000-0000-000000000000") + event_uuid = str(uuid.uuid5(namespace, f"{trace_id}:{span_id}")) + + result = { + "event": event_type, + "distinct_id": distinct_id, + "timestamp": timestamp, + "properties": properties, + } + + if event_uuid: + result["uuid"] = event_uuid + + return result + + +def build_event_properties( + span: dict[str, Any], + attrs: dict[str, Any], + resource: dict[str, Any], + scope: dict[str, Any], + baggage: dict[str, str], +) -> dict[str, Any]: + """Build PostHog AI event properties from extracted attributes.""" + attributes = span.get("attributes", {}) + status = span.get("status", {}) + + # Core identifiers (prefer extracted, fallback to span built-ins) + trace_id = attrs.get("trace_id") or span.get("trace_id") + span_id = attrs.get("span_id") or span.get("span_id") + parent_id = attrs.get("parent_id") or span.get("parent_span_id") + + # Session ID (prefer extracted, fallback to baggage) + session_id = attrs.get("session_id") or baggage.get("session_id") or baggage.get("posthog.session_id") + + # Calculate latency + latency = calculate_latency(span) + + # Detect error from span status + is_error = attrs.get("is_error") + if is_error is None: + is_error = status.get("code") == SPAN_STATUS_ERROR + error_message = attrs.get("error_message") + if error_message is None and is_error: + error_message = status.get("message") + + # Build base properties + properties: dict[str, Any] = { + # Core IDs + "$ai_trace_id": trace_id, + "$ai_span_id": span_id, + } + + # Optional core IDs + if parent_id: + properties["$ai_parent_id"] = parent_id + if session_id: + properties["$ai_session_id"] = session_id + if attrs.get("generation_id"): + properties["$ai_generation_id"] = attrs["generation_id"] + + # Model info + if attrs.get("model"): + properties["$ai_model"] = attrs["model"] + if attrs.get("provider"): + properties["$ai_provider"] = attrs["provider"] + + # Tokens + if attrs.get("input_tokens") is not None: + properties["$ai_input_tokens"] = attrs["input_tokens"] + if attrs.get("output_tokens") is not None: + properties["$ai_output_tokens"] = attrs["output_tokens"] + if attrs.get("cache_read_tokens") is not None: + properties["$ai_cache_read_input_tokens"] = attrs["cache_read_tokens"] + if attrs.get("cache_write_tokens") is not None: + properties["$ai_cache_creation_input_tokens"] = attrs["cache_write_tokens"] + + # Cost + if attrs.get("input_cost_usd") is not None: + properties["$ai_input_cost_usd"] = attrs["input_cost_usd"] + if attrs.get("output_cost_usd") is not None: + properties["$ai_output_cost_usd"] = attrs["output_cost_usd"] + if attrs.get("total_cost_usd") is not None: + properties["$ai_total_cost_usd"] = attrs["total_cost_usd"] + + # Timing + if latency is not None: + properties["$ai_latency"] = latency + + # Error + if is_error: + properties["$ai_is_error"] = is_error + if error_message: + properties["$ai_error"] = error_message + + # Model parameters + if attrs.get("temperature") is not None: + properties["$ai_temperature"] = attrs["temperature"] + if attrs.get("max_tokens") is not None: + properties["$ai_max_tokens"] = attrs["max_tokens"] + if attrs.get("stream") is not None: + properties["$ai_stream"] = attrs["stream"] + + # Content (handle both direct input/output and prompt/completion) + # Ensure $ai_input is array of messages + content_input = attrs.get("input") or attrs.get("prompt") + if content_input: + if isinstance(content_input, list): + properties["$ai_input"] = content_input + elif isinstance(content_input, dict): + properties["$ai_input"] = [content_input] + elif isinstance(content_input, str): + try: + parsed = json.loads(content_input) + properties["$ai_input"] = ( + parsed if isinstance(parsed, list) else [{"role": "user", "content": content_input}] + ) + except (json.JSONDecodeError, TypeError): + properties["$ai_input"] = [{"role": "user", "content": content_input}] + else: + properties["$ai_input"] = [{"role": "user", "content": str(content_input)}] + + # Ensure $ai_output_choices is array of choices + content_output = attrs.get("output") or attrs.get("completion") + if content_output: + if isinstance(content_output, list): + properties["$ai_output_choices"] = content_output + elif isinstance(content_output, dict): + properties["$ai_output_choices"] = [content_output] + elif isinstance(content_output, str): + try: + parsed = json.loads(content_output) + properties["$ai_output_choices"] = ( + parsed if isinstance(parsed, list) else [{"role": "assistant", "content": content_output}] + ) + except (json.JSONDecodeError, TypeError): + properties["$ai_output_choices"] = [{"role": "assistant", "content": content_output}] + else: + properties["$ai_output_choices"] = [{"role": "assistant", "content": str(content_output)}] + + # Span name + if span.get("name"): + properties["$ai_span_name"] = span["name"] + + # Metadata + properties["$ai_otel_transformer_version"] = OTEL_TRANSFORMER_VERSION + properties["$ai_otel_span_kind"] = str(span.get("kind", 0)) + properties["$ai_otel_status_code"] = str(status.get("code", 0)) + + # Resource attributes (service name, etc.) + if resource.get("service.name"): + properties["$ai_service_name"] = resource["service.name"] + + # Instrumentation scope + properties["$ai_instrumentation_scope_name"] = scope.get("name", "unknown") + if scope.get("version"): + properties["$ai_instrumentation_scope_version"] = scope["version"] + + # Extract tool definitions from llm.request.functions.* attributes + tools = _extract_tools_from_attributes(attributes) + if tools: + properties["$ai_tools"] = tools + + # Add remaining span attributes (not already mapped) + mapped_keys = { + "posthog.ai.model", + "posthog.ai.provider", + "gen_ai.system", + "gen_ai.request.model", + "gen_ai.response.model", + "gen_ai.operation.name", + "gen_ai.usage.input_tokens", + "gen_ai.usage.output_tokens", + "gen_ai.prompt", + "gen_ai.completion", + "service.name", + "input", # Generic input (e.g., Mastra for Laminar compatibility) + "output", # Generic output (e.g., Mastra for Laminar compatibility) + } + + for key, value in attributes.items(): + if key not in mapped_keys and not key.startswith("posthog.ai.") and not key.startswith("gen_ai."): + # Skip llm.request.functions.* as they're now in $ai_tools + if not key.startswith("llm.request.functions."): + # Add unmapped attributes with prefix + properties[f"otel.{key}"] = value + + return properties + + +def _extract_tools_from_attributes(attributes: dict[str, Any]) -> list[dict[str, Any]] | None: + """ + Extract tool definitions from llm.request.functions.* attributes. + + Converts flat attributes like: + llm.request.functions.0.name = "get_weather" + llm.request.functions.0.description = "Get weather" + llm.request.functions.0.parameters = "{...}" + + Into structured array: + [{"name": "get_weather", "description": "Get weather", "input_schema": {...}}] + """ + from collections import defaultdict + + tools_by_index: dict[int, dict[str, Any]] = defaultdict(dict) + + for key, value in attributes.items(): + if not key.startswith("llm.request.functions."): + continue + + # Parse: llm.request.functions.0.name -> index=0, field=name + parts = key[len("llm.request.functions.") :].split(".", 1) + if len(parts) != 2: + continue + + try: + index = int(parts[0]) + field = parts[1] + + # Map OTEL field names to PostHog SDK format + if field == "parameters": + # Parse JSON string to dict for input_schema + try: + tools_by_index[index]["input_schema"] = json.loads(value) if isinstance(value, str) else value + except (json.JSONDecodeError, TypeError): + tools_by_index[index]["input_schema"] = value + else: + tools_by_index[index][field] = value + + except (ValueError, IndexError): + continue + + if not tools_by_index: + return None + + # Convert to sorted list + return [tools_by_index[i] for i in sorted(tools_by_index.keys())] + + +def determine_event_type(span: dict[str, Any], attrs: dict[str, Any], scope: dict[str, Any] | None = None) -> str: + """Determine AI event type from span.""" + scope = scope or {} + op_name = attrs.get("operation_name", "").lower() + + # Check operation name first (highest priority) + if op_name in ("chat", "completion"): + return "$ai_generation" + elif op_name in ("embedding", "embeddings"): + return "$ai_embedding" + + # Check if span has LLM-specific attributes (provider, model, tokens) + # These indicate it's an actual LLM generation, not just a wrapper span + has_llm_attrs = bool( + attrs.get("provider") and attrs.get("model") and (attrs.get("input_tokens") is not None or attrs.get("prompt")) + ) + if has_llm_attrs: + return "$ai_generation" + + # Check if span is root (no parent) + # For V1 frameworks, root spans should be $ai_span, not $ai_trace + # $ai_trace events get filtered out by TraceQueryRunner, breaking tree hierarchy + if not span.get("parent_span_id"): + provider = detect_provider(span, scope) + if provider: + pattern = provider.get_instrumentation_pattern() + if pattern == OtelInstrumentationPattern.V1_ATTRIBUTES: + # V1 frameworks: root span should be $ai_span (will be included in tree) + return "$ai_span" + # V2 frameworks or unknown: root is $ai_trace (separate from event tree) + return "$ai_trace" + + # Default to generic span + return "$ai_span" + + +def calculate_timestamp(span: dict[str, Any]) -> str: + """Calculate timestamp from span start time.""" + start_nanos = int(span.get("start_time_unix_nano", 0)) + millis = start_nanos // 1_000_000 + return datetime.fromtimestamp(millis / 1000, tz=UTC).isoformat() + + +def calculate_latency(span: dict[str, Any]) -> float | None: + """Calculate latency in seconds from span start/end time.""" + end_nanos = span.get("end_time_unix_nano") + if not end_nanos: + return None + + start_nanos = int(span.get("start_time_unix_nano", 0)) + end_nanos = int(end_nanos) + duration_nanos = end_nanos - start_nanos + + # Convert to seconds + return duration_nanos / 1_000_000_000 + + +def extract_distinct_id(resource: dict[str, Any], baggage: dict[str, str]) -> str: + """Extract distinct_id from resource or baggage.""" + # Try resource attributes + user_id = resource.get("user.id") or resource.get("enduser.id") or resource.get("posthog.distinct_id") + + if user_id and isinstance(user_id, str): + return user_id + + # Try baggage + if baggage.get("user_id"): + return baggage["user_id"] + if baggage.get("distinct_id"): + return baggage["distinct_id"] + + # Default to anonymous + return "anonymous"