Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions headroom/proxy/handlers/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -1251,6 +1251,7 @@ async def _finalize_pre_upstream() -> None:
tags,
optimization_latency,
pipeline_timing=pipeline_timing,
original_messages=original_messages,
)
else:
async with stage_timer.measure("upstream_connect"):
Expand Down Expand Up @@ -1346,7 +1347,16 @@ async def _finalize_pre_upstream() -> None:
tags=tags,
cache_hit=False,
transforms_applied=transforms_applied,
request_messages=body.get("messages")
# `original_messages` is the pre-compression
# snapshot from line 724; `body["messages"]`
# was mutated in-place at line 1189 with the
# compressed (optimized) list that was sent
# upstream. Gated by the same flag so the
# two sides stay symmetric.
request_messages=original_messages
if self.config.log_full_messages
else None,
compressed_messages=body.get("messages")
if self.config.log_full_messages
else None,
turn_id=compute_turn_id(
Expand Down Expand Up @@ -1819,7 +1829,15 @@ async def api_call_fn(
cache_hit=cache_hit,
transforms_applied=transforms_applied,
waste_signals=waste_signals_dict,
request_messages=messages
# See comment at the Bedrock log site above:
# `original_messages` (line 724) is the
# pre-compression snapshot, `body["messages"]`
# is what was sent upstream after in-place
# mutation at line 1189.
request_messages=original_messages
if self.config.log_full_messages
else None,
compressed_messages=body.get("messages")
if self.config.log_full_messages
else None,
turn_id=compute_turn_id(
Expand Down
20 changes: 18 additions & 2 deletions headroom/proxy/handlers/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,14 @@ async def _finalize_stream_response(
tags=tags or {},
cache_hit=False,
transforms_applied=transforms_applied,
request_messages=body.get("messages")
# `original_messages` (function param) is the pre-compression
# snapshot threaded in by the caller; `body["messages"]`
# was mutated in-place by the caller with the compressed
# list actually sent upstream. Both gated by the same flag.
request_messages=original_messages
if getattr(self.config, "log_full_messages", False)
else None,
compressed_messages=body.get("messages")
if getattr(self.config, "log_full_messages", False)
else None,
)
Expand Down Expand Up @@ -937,6 +944,7 @@ async def _stream_response_bedrock(
tags: dict[str, str],
optimization_latency: float,
pipeline_timing: dict[str, float] | None = None,
original_messages: list[dict] | None = None,
) -> StreamingResponse:
"""Stream response from Bedrock backend with metrics tracking.

Expand Down Expand Up @@ -1041,7 +1049,15 @@ async def generate():
tags=tags,
cache_hit=False,
transforms_applied=transforms_applied,
request_messages=body.get("messages")
# `original_messages` is threaded in by the caller
# (anthropic.py _stream_response_bedrock call site).
# `body["messages"]` is the compressed list after
# the caller's in-place mutation. Both gated by
# `log_full_messages`.
request_messages=original_messages
if self.config.log_full_messages
else None,
compressed_messages=body.get("messages")
if self.config.log_full_messages
else None,
turn_id=compute_turn_id(
Expand Down
5 changes: 5 additions & 0 deletions headroom/proxy/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ class RequestLog:

# Request/Response (optional, for debugging)
request_messages: list[dict] | None = None
# Messages after compression — i.e. what was actually sent upstream. Paired
# with `request_messages` so consumers can diff the two sides of the
# compression to see exactly what was stripped, replaced, or kept. Gated
# by the same `log_full_messages` flag as the original request.
compressed_messages: list[dict] | None = None
response_content: str | None = None
error: str | None = None

Expand Down
8 changes: 6 additions & 2 deletions headroom/proxy/request_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,22 @@ def log(self, entry: RequestLog):
log_dict = asdict(entry)
if not self.log_full_messages:
log_dict.pop("request_messages", None)
log_dict.pop("compressed_messages", None)
log_dict.pop("response_content", None)
f.write(json.dumps(log_dict) + "\n")
except OSError:
pass # Graceful degradation: memory-only logging continues

def get_recent(self, n: int = 100) -> list[dict]:
"""Get recent log entries (without request_messages and response_content)."""
"""Get recent log entries (without request_messages, compressed_messages,
and response_content)."""
# Convert deque to list for slicing (deque doesn't support slicing)
entries = list(self._logs)[-n:]
return [
{
k: v
for k, v in asdict(e).items()
if k not in ("request_messages", "response_content")
if k not in ("request_messages", "compressed_messages", "response_content")
}
for e in entries
]
Expand Down Expand Up @@ -115,6 +117,8 @@ def get_memory_stats(self) -> ComponentStats:
# Messages and response can be large
if log_entry.request_messages:
size_bytes += sys.getsizeof(log_entry.request_messages)
if log_entry.compressed_messages:
size_bytes += sys.getsizeof(log_entry.compressed_messages)
if log_entry.response_content:
size_bytes += len(log_entry.response_content)

Expand Down
Loading