From 4ac2d927209436f57d62dd4a0b196e73d9ef5fcb Mon Sep 17 00:00:00 2001 From: Steven Enamakel Date: Sat, 11 Apr 2026 23:31:42 -0700 Subject: [PATCH 1/6] feat(transcript): implement session transcript management for KV cache stability - Added support for session transcript persistence, allowing the exact `Vec` to be stored as human-readable `.md` files. - Introduced methods for loading previous transcripts to pre-populate messages for KV cache reuse, enhancing session continuity. - Updated `AgentBuilder` to include an agent definition name for transcript file naming. - Refactored `Agent` to handle transcript loading and persistence during session turns, ensuring byte-identical message handling across sessions. - Created a new `transcript.rs` module to encapsulate transcript-related functionality, improving code organization and maintainability. --- .../agent/harness/session/builder.rs | 14 + src/openhuman/agent/harness/session/mod.rs | 1 + .../agent/harness/session/transcript.rs | 503 ++++++++++++++++++ src/openhuman/agent/harness/session/turn.rs | 142 ++++- src/openhuman/agent/harness/session/types.rs | 16 +- 5 files changed, 674 insertions(+), 2 deletions(-) create mode 100644 src/openhuman/agent/harness/session/transcript.rs diff --git a/src/openhuman/agent/harness/session/builder.rs b/src/openhuman/agent/harness/session/builder.rs index 0c16abba9..93081309a 100644 --- a/src/openhuman/agent/harness/session/builder.rs +++ b/src/openhuman/agent/harness/session/builder.rs @@ -44,6 +44,7 @@ impl AgentBuilder { learning_enabled: false, event_session_id: None, event_channel: None, + agent_definition_name: None, } } @@ -183,6 +184,14 @@ impl AgentBuilder { self } + /// Sets the human-readable agent definition name used as the + /// `{agent}` prefix in session transcript filenames + /// (`sessions/DDMMYYYY/{agent}_{index}.md`). + pub fn agent_definition_name(mut self, name: impl Into) -> Self { + self.agent_definition_name = Some(name.into()); + self + } + /// Validates the configuration and builds the `Agent` instance. pub fn build(self) -> Result { let tools = self @@ -275,6 +284,11 @@ impl AgentBuilder { .event_session_id .unwrap_or_else(|| "standalone".to_string()), event_channel: self.event_channel.unwrap_or_else(|| "internal".to_string()), + agent_definition_name: self + .agent_definition_name + .unwrap_or_else(|| "main".to_string()), + session_transcript_path: None, + cached_transcript_messages: None, context, }) } diff --git a/src/openhuman/agent/harness/session/mod.rs b/src/openhuman/agent/harness/session/mod.rs index 4f54867b0..4f7f37225 100644 --- a/src/openhuman/agent/harness/session/mod.rs +++ b/src/openhuman/agent/harness/session/mod.rs @@ -22,6 +22,7 @@ mod builder; mod runtime; +pub(crate) mod transcript; mod turn; mod types; diff --git a/src/openhuman/agent/harness/session/transcript.rs b/src/openhuman/agent/harness/session/transcript.rs new file mode 100644 index 000000000..eeacbfe1d --- /dev/null +++ b/src/openhuman/agent/harness/session/transcript.rs @@ -0,0 +1,503 @@ +//! Session transcript persistence for KV cache stability. +//! +//! Stores the **exact** `Vec` sent to the LLM provider as +//! a human-readable `.md` file. On session resume the transcript is read +//! back to produce byte-identical messages, ensuring the inference +//! backend's KV cache prefix is reused rather than re-prefilled. +//! +//! ## File format +//! +//! ```text +//! +//! +//! +//! +//! +//! +//! +//! +//! +//! ``` +//! +//! Content between `` and `` is the **exact** +//! `ChatMessage.content`. The single escape: any literal `` +//! inside content is written as `` and reversed on read. +//! +//! ## Storage layout +//! +//! ```text +//! {workspace}/sessions/DDMMYYYY/{agent}_{index}.md +//! ``` + +use crate::openhuman::providers::ChatMessage; +use anyhow::{Context, Result}; +use std::fmt::Write as FmtWrite; +use std::fs; +use std::path::{Path, PathBuf}; + +const MSG_OPEN_PREFIX: &str = ""; +const MSG_CLOSE: &str = ""; +const MSG_CLOSE_ESCAPED: &str = ""; + +/// Metadata header for a session transcript file. +#[derive(Debug, Clone)] +pub struct TranscriptMeta { + pub agent_name: String, + pub dispatcher: String, + pub cache_boundary: Option, + pub created: String, + pub updated: String, + pub turn_count: usize, +} + +/// A parsed session transcript: metadata + exact message array. +#[derive(Debug, Clone)] +pub struct SessionTranscript { + pub meta: TranscriptMeta, + pub messages: Vec, +} + +// ── Write ──────────────────────────────────────────────────────────── + +/// Write a session transcript to `path`. Full rewrite (not append) +/// because context reduction may have removed earlier messages. +pub fn write_transcript(path: &Path, messages: &[ChatMessage], meta: &TranscriptMeta) -> Result<()> { + if let Some(parent) = path.parent() { + fs::create_dir_all(parent) + .with_context(|| format!("create transcript dir {}", parent.display()))?; + } + + let mut buf = String::new(); + + // Header + buf.push_str("\n"); + + // Messages + for msg in messages { + buf.push('\n'); + let _ = write!(buf, "{}{}{}\n", MSG_OPEN_PREFIX, msg.role, MSG_OPEN_SUFFIX); + buf.push_str(&escape_content(&msg.content)); + buf.push('\n'); + buf.push_str(MSG_CLOSE); + buf.push('\n'); + } + + fs::write(path, buf.as_bytes()) + .with_context(|| format!("write transcript {}", path.display()))?; + + log::debug!( + "[transcript] wrote {} messages to {}", + messages.len(), + path.display() + ); + + Ok(()) +} + +// ── Read ───────────────────────────────────────────────────────────── + +/// Read a session transcript from `path` and return the exact messages. +pub fn read_transcript(path: &Path) -> Result { + let raw = fs::read_to_string(path) + .with_context(|| format!("read transcript {}", path.display()))?; + + let meta = parse_meta(&raw) + .with_context(|| format!("parse transcript meta in {}", path.display()))?; + + let messages = parse_messages(&raw) + .with_context(|| format!("parse transcript messages in {}", path.display()))?; + + log::debug!( + "[transcript] loaded {} messages from {}", + messages.len(), + path.display() + ); + + Ok(SessionTranscript { meta, messages }) +} + +// ── Path resolution ────────────────────────────────────────────────── + +/// Resolve a new transcript path under +/// `{workspace}/sessions/DDMMYYYY/{agent}_{index}.md`. +/// +/// Creates the date directory if needed. Index = max existing + 1. +pub fn resolve_new_transcript_path(workspace_dir: &Path, agent_name: &str) -> Result { + let date_dir = today_session_dir(workspace_dir); + fs::create_dir_all(&date_dir) + .with_context(|| format!("create session dir {}", date_dir.display()))?; + + let sanitized = sanitize_agent_name(agent_name); + let next_index = next_index(&date_dir, &sanitized)?; + let filename = format!("{}_{}.md", sanitized, next_index); + + Ok(date_dir.join(filename)) +} + +/// Find the most recent transcript for `agent_name`. +/// +/// Searches today's directory first, then yesterday's. Returns the +/// file with the highest index (most recent session). +pub fn find_latest_transcript(workspace_dir: &Path, agent_name: &str) -> Option { + let sanitized = sanitize_agent_name(agent_name); + let sessions_root = workspace_dir.join("sessions"); + + // Search today first, then yesterday + let today = chrono::Local::now().format("%d%m%Y").to_string(); + let yesterday = (chrono::Local::now() - chrono::Duration::days(1)) + .format("%d%m%Y") + .to_string(); + + for date_str in [&today, &yesterday] { + let dir = sessions_root.join(date_str); + if !dir.is_dir() { + continue; + } + if let Some(path) = latest_in_dir(&dir, &sanitized) { + return Some(path); + } + } + + None +} + +// ── Internals ──────────────────────────────────────────────────────── + +fn escape_content(content: &str) -> String { + content.replace(MSG_CLOSE, MSG_CLOSE_ESCAPED) +} + +fn unescape_content(content: &str) -> String { + content.replace(MSG_CLOSE_ESCAPED, MSG_CLOSE) +} + +fn parse_meta(raw: &str) -> Result { + let header_start = raw + .find("") + .context("unclosed session_transcript header")?; + let header = &raw[header_start..header_start + header_end + 3]; + + let get = |key: &str| -> Option { + header + .lines() + .find_map(|line| { + let line = line.trim(); + if line.starts_with(&format!("{key}:")) { + Some(line[key.len() + 1..].trim().to_string()) + } else { + None + } + }) + }; + + Ok(TranscriptMeta { + agent_name: get("agent").unwrap_or_else(|| "unknown".into()), + dispatcher: get("dispatcher").unwrap_or_else(|| "native".into()), + cache_boundary: get("cache_boundary").and_then(|s| s.parse().ok()), + created: get("created").unwrap_or_default(), + updated: get("updated").unwrap_or_default(), + turn_count: get("turn_count").and_then(|s| s.parse().ok()).unwrap_or(0), + }) +} + +fn parse_messages(raw: &str) -> Result> { + let mut messages = Vec::new(); + let mut search_from = 0; + + loop { + // Find next opening tag + let Some(open_start) = raw[search_from..].find(MSG_OPEN_PREFIX) else { + break; + }; + let open_start = search_from + open_start; + let after_prefix = open_start + MSG_OPEN_PREFIX.len(); + + // Extract role from between the quotes + let Some(role_end) = raw[after_prefix..].find(MSG_OPEN_SUFFIX) else { + break; + }; + let role = raw[after_prefix..after_prefix + role_end].to_string(); + + // Content starts after the opening tag + newline + let content_start = after_prefix + role_end + MSG_OPEN_SUFFIX.len(); + let content_start = if raw[content_start..].starts_with('\n') { + content_start + 1 + } else { + content_start + }; + + // Find closing tag + let close_tag = format!("\n{MSG_CLOSE}"); + let Some(content_end_rel) = raw[content_start..].find(&close_tag) else { + // Try without leading newline for empty content + let Some(content_end_rel) = raw[content_start..].find(MSG_CLOSE) else { + break; + }; + let content = &raw[content_start..content_start + content_end_rel]; + messages.push(ChatMessage { + role, + content: unescape_content(content), + }); + search_from = content_start + content_end_rel + MSG_CLOSE.len(); + continue; + }; + + let content = &raw[content_start..content_start + content_end_rel]; + messages.push(ChatMessage { + role, + content: unescape_content(content), + }); + + search_from = content_start + content_end_rel + close_tag.len(); + } + + Ok(messages) +} + +fn today_session_dir(workspace_dir: &Path) -> PathBuf { + let date = chrono::Local::now().format("%d%m%Y").to_string(); + workspace_dir.join("sessions").join(date) +} + +fn sanitize_agent_name(name: &str) -> String { + name.chars() + .map(|c| if c.is_alphanumeric() || c == '-' || c == '_' { c } else { '_' }) + .collect() +} + +fn next_index(dir: &Path, agent_prefix: &str) -> Result { + let prefix = format!("{}_", agent_prefix); + let mut max_idx: Option = None; + + if let Ok(entries) = fs::read_dir(dir) { + for entry in entries.flatten() { + let name = entry.file_name(); + let name = name.to_string_lossy(); + if name.starts_with(&prefix) && name.ends_with(".md") { + let idx_str = &name[prefix.len()..name.len() - 3]; + if let Ok(idx) = idx_str.parse::() { + max_idx = Some(max_idx.map_or(idx, |m: usize| m.max(idx))); + } + } + } + } + + Ok(max_idx.map_or(0, |m| m + 1)) +} + +fn latest_in_dir(dir: &Path, agent_prefix: &str) -> Option { + let prefix = format!("{}_", agent_prefix); + let mut best: Option<(usize, PathBuf)> = None; + + let entries = fs::read_dir(dir).ok()?; + for entry in entries.flatten() { + let name = entry.file_name(); + let name_str = name.to_string_lossy(); + if name_str.starts_with(&prefix) && name_str.ends_with(".md") { + let idx_str = &name_str[prefix.len()..name_str.len() - 3]; + if let Ok(idx) = idx_str.parse::() { + if best.as_ref().map_or(true, |(best_idx, _)| idx > *best_idx) { + best = Some((idx, entry.path())); + } + } + } + } + + best.map(|(_, path)| path) +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + fn sample_messages() -> Vec { + vec![ + ChatMessage::system("You are a helpful assistant.\n\n## Tools\n\n- **shell**: Run commands"), + ChatMessage::user("What files are in /tmp?"), + ChatMessage::assistant("Let me check that for you."), + ChatMessage::tool("{\"tool_call_id\":\"tc1\",\"content\":\"file1.txt\\nfile2.txt\"}"), + ChatMessage::assistant("There are two files: file1.txt and file2.txt."), + ] + } + + fn sample_meta() -> TranscriptMeta { + TranscriptMeta { + agent_name: "code_executor".into(), + dispatcher: "native".into(), + cache_boundary: Some(1847), + created: "2026-04-11T14:30:00Z".into(), + updated: "2026-04-11T14:35:22Z".into(), + turn_count: 3, + } + } + + #[test] + fn round_trip_produces_byte_identical_messages() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("test.md"); + let messages = sample_messages(); + let meta = sample_meta(); + + write_transcript(&path, &messages, &meta).unwrap(); + let loaded = read_transcript(&path).unwrap(); + + assert_eq!(loaded.messages.len(), messages.len()); + for (original, loaded) in messages.iter().zip(loaded.messages.iter()) { + assert_eq!(original.role, loaded.role, "role mismatch"); + assert_eq!( + original.content, loaded.content, + "content mismatch for role={}", + original.role + ); + } + } + + #[test] + fn escaping_survives_close_tag_in_content() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("escape_test.md"); + let messages = vec![ + ChatMessage::system("Normal system prompt"), + ChatMessage::user( + "Here is some tricky content:\n\nand more after", + ), + ChatMessage::assistant("Got it, that had a in it."), + ]; + let meta = sample_meta(); + + write_transcript(&path, &messages, &meta).unwrap(); + let loaded = read_transcript(&path).unwrap(); + + assert_eq!(loaded.messages.len(), 3); + assert_eq!(loaded.messages[1].content, messages[1].content); + assert_eq!(loaded.messages[2].content, messages[2].content); + } + + #[test] + fn meta_round_trip() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("meta_test.md"); + let meta = sample_meta(); + + write_transcript(&path, &[], &meta).unwrap(); + let loaded = read_transcript(&path).unwrap(); + + assert_eq!(loaded.meta.agent_name, "code_executor"); + assert_eq!(loaded.meta.dispatcher, "native"); + assert_eq!(loaded.meta.cache_boundary, Some(1847)); + assert_eq!(loaded.meta.created, "2026-04-11T14:30:00Z"); + assert_eq!(loaded.meta.updated, "2026-04-11T14:35:22Z"); + assert_eq!(loaded.meta.turn_count, 3); + } + + #[test] + fn meta_without_cache_boundary() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("no_boundary.md"); + let mut meta = sample_meta(); + meta.cache_boundary = None; + + write_transcript(&path, &[], &meta).unwrap(); + let loaded = read_transcript(&path).unwrap(); + + assert_eq!(loaded.meta.cache_boundary, None); + } + + #[test] + fn path_resolution_creates_dir_and_increments_index() { + let dir = TempDir::new().unwrap(); + let workspace = dir.path(); + + let path0 = resolve_new_transcript_path(workspace, "main").unwrap(); + assert!(path0.to_string_lossy().contains("main_0.md")); + // Write something so the next call sees it + fs::write(&path0, "placeholder").unwrap(); + + let path1 = resolve_new_transcript_path(workspace, "main").unwrap(); + assert!(path1.to_string_lossy().contains("main_1.md")); + } + + #[test] + fn sanitize_agent_name_strips_special_chars() { + assert_eq!(sanitize_agent_name("code_executor"), "code_executor"); + assert_eq!(sanitize_agent_name("my agent!"), "my_agent_"); + assert_eq!(sanitize_agent_name("agent-v2"), "agent-v2"); + } + + #[test] + fn find_latest_returns_highest_index() { + let dir = TempDir::new().unwrap(); + let date = chrono::Local::now().format("%d%m%Y").to_string(); + let session_dir = dir.path().join("sessions").join(&date); + fs::create_dir_all(&session_dir).unwrap(); + + fs::write(session_dir.join("main_0.md"), "a").unwrap(); + fs::write(session_dir.join("main_2.md"), "c").unwrap(); + fs::write(session_dir.join("main_1.md"), "b").unwrap(); + fs::write(session_dir.join("other_0.md"), "x").unwrap(); + + let latest = find_latest_transcript(dir.path(), "main"); + assert!(latest.is_some()); + let latest = latest.unwrap(); + assert!(latest.to_string_lossy().contains("main_2.md")); + } + + #[test] + fn find_latest_returns_none_when_no_sessions() { + let dir = TempDir::new().unwrap(); + assert!(find_latest_transcript(dir.path(), "main").is_none()); + } + + #[test] + fn empty_content_message_round_trips() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("empty.md"); + let messages = vec![ + ChatMessage::system("prompt"), + ChatMessage::assistant(""), + ChatMessage::user("hi"), + ]; + let meta = sample_meta(); + + write_transcript(&path, &messages, &meta).unwrap(); + let loaded = read_transcript(&path).unwrap(); + + assert_eq!(loaded.messages.len(), 3); + assert_eq!(loaded.messages[1].content, ""); + } + + #[test] + fn multiline_content_preserves_exact_whitespace() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("whitespace.md"); + let content = " leading spaces\n\n\nmultiple blanks\n trailing "; + let messages = vec![ChatMessage::user(content)]; + let meta = sample_meta(); + + write_transcript(&path, &messages, &meta).unwrap(); + let loaded = read_transcript(&path).unwrap(); + + assert_eq!(loaded.messages[0].content, content); + } +} diff --git a/src/openhuman/agent/harness/session/turn.rs b/src/openhuman/agent/harness/session/turn.rs index a6dba1070..edb0b41d6 100644 --- a/src/openhuman/agent/harness/session/turn.rs +++ b/src/openhuman/agent/harness/session/turn.rs @@ -19,6 +19,7 @@ //! - [`Agent::spawn_session_memory_extraction`] — the fire-and-forget //! background archivist fork. +use super::transcript; use super::types::Agent; use crate::core::event_bus::{publish_global, DomainEvent}; use crate::openhuman::agent::dispatcher::{ParsedToolCall, ToolExecutionResult}; @@ -50,6 +51,14 @@ impl Agent { self.history.len(), self.config.max_tool_iterations ); + // ── Session transcript resume ───────────────────────────────── + // On a fresh session (empty history), look for a previous + // transcript to pre-populate the exact provider messages for + // KV cache prefix reuse. + if self.history.is_empty() && self.cached_transcript_messages.is_none() { + self.try_load_session_transcript(); + } + if self.history.is_empty() { // Learned context is only baked into the system prompt on the // very first turn — once the history is non-empty we reuse the @@ -145,6 +154,10 @@ impl Agent { // Collect tool call records across all iterations for post-turn hooks let mut all_tool_records: Vec = Vec::new(); + // Capture the last `Vec` sent to the provider so we + // can persist it as a session transcript after the turn completes. + let mut last_provider_messages: Option> = None; + let turn_body = async { for iteration in 0..self.config.max_tool_iterations { log::info!( @@ -218,7 +231,27 @@ impl Agent { } } - let messages = self.tool_dispatcher.to_provider_messages(&self.history); + // Use cached transcript messages on the first iteration of + // a resumed session to provide a byte-identical prefix for + // KV cache reuse. After `.take()` the cache is consumed; + // subsequent iterations rebuild from history normally. + let messages = if let Some(mut cached) = self.cached_transcript_messages.take() { + // Append only the delta (new user message) from the + // end of the current history. + let new_tail = self.tool_dispatcher + .to_provider_messages(&self.history[self.history.len().saturating_sub(1)..]); + cached.extend(new_tail); + log::info!( + "[transcript] resumed from cached transcript prefix_len={} new_tail={}", + cached.len() - 1, + 1 + ); + cached + } else { + self.tool_dispatcher.to_provider_messages(&self.history) + }; + last_provider_messages = Some(messages.clone()); + log::info!( "[agent] iteration {}/{} — sending request to provider model={}", iteration + 1, @@ -436,6 +469,15 @@ impl Agent { // the PARENT_CONTEXT task-local. let result = harness::with_parent_context(parent_context, turn_body).await; + // ── Session transcript persistence ──────────────────────────── + // Persist the exact provider messages so a future session can + // resume with a byte-identical prefix for KV cache reuse. + if result.is_ok() { + if let Some(ref messages) = last_provider_messages { + self.persist_session_transcript(messages); + } + } + // ── Session-memory extraction (stage 5) ─────────────────────── // // If the pipeline's deltas have crossed all three thresholds @@ -781,6 +823,104 @@ impl Agent { self.context.build_system_prompt_with_cache_metadata(&ctx) } + // ───────────────────────────────────────────────────────────────── + // Session transcript helpers + // ───────────────────────────────────────────────────────────────── + + /// Try to load a previous session transcript for KV cache resume. + /// + /// Best-effort: failures are logged and silently ignored. + pub(super) fn try_load_session_transcript(&mut self) { + match transcript::find_latest_transcript(&self.workspace_dir, &self.agent_definition_name) { + Some(path) => { + log::info!( + "[transcript] found previous transcript path={}", + path.display() + ); + match transcript::read_transcript(&path) { + Ok(session) => { + if session.messages.is_empty() { + log::debug!("[transcript] previous transcript is empty — skipping resume"); + return; + } + // Restore the cache boundary from the transcript + // metadata so the provider request carries the + // same offset as the original session. + self.system_prompt_cache_boundary = session.meta.cache_boundary; + log::info!( + "[transcript] loaded {} messages for resume (cache_boundary={:?})", + session.messages.len(), + session.meta.cache_boundary + ); + self.cached_transcript_messages = Some(session.messages); + } + Err(err) => { + log::warn!( + "[transcript] failed to parse previous transcript {}: {err}", + path.display() + ); + } + } + } + None => { + log::debug!( + "[transcript] no previous transcript found for agent={}", + self.agent_definition_name + ); + } + } + } + + /// Persist the exact provider messages as a session transcript. + /// + /// Best-effort: failures are logged and silently ignored. The JSONL + /// conversation store remains the authoritative persistence layer; + /// session transcripts are an optimization for KV cache stability. + pub(super) fn persist_session_transcript(&mut self, messages: &[ChatMessage]) { + // Resolve the transcript path on first write. + if self.session_transcript_path.is_none() { + match transcript::resolve_new_transcript_path( + &self.workspace_dir, + &self.agent_definition_name, + ) { + Ok(path) => { + log::info!( + "[transcript] new session transcript path={}", + path.display() + ); + self.session_transcript_path = Some(path); + } + Err(err) => { + log::warn!("[transcript] failed to resolve transcript path: {err}"); + return; + } + } + } + + let path = self.session_transcript_path.as_ref().unwrap(); + let now = chrono::Utc::now().to_rfc3339(); + + let meta = transcript::TranscriptMeta { + agent_name: self.agent_definition_name.clone(), + dispatcher: if self.tool_dispatcher.should_send_tool_specs() { + "native".into() + } else { + "xml".into() + }, + cache_boundary: self.system_prompt_cache_boundary, + created: now.clone(), + updated: now, + turn_count: self.context.stats().session_memory_current_turn as usize, + }; + + if let Err(err) = transcript::write_transcript(path, messages, &meta) { + log::warn!( + "[transcript] failed to write transcript {}: {err}", + path.display() + ); + } + } + // ───────────────────────────────────────────────────────────────── // Session-memory extraction (stage 5 of the context pipeline) // ───────────────────────────────────────────────────────────────── diff --git a/src/openhuman/agent/harness/session/types.rs b/src/openhuman/agent/harness/session/types.rs index c4581f766..e3f7529c2 100644 --- a/src/openhuman/agent/harness/session/types.rs +++ b/src/openhuman/agent/harness/session/types.rs @@ -12,8 +12,9 @@ use crate::openhuman::agent::memory_loader::MemoryLoader; use crate::openhuman::context::prompt::SystemPromptBuilder; use crate::openhuman::context::ContextManager; use crate::openhuman::memory::Memory; -use crate::openhuman::providers::{ConversationMessage, Provider}; +use crate::openhuman::providers::{ChatMessage, ConversationMessage, Provider}; use crate::openhuman::tools::{Tool, ToolSpec}; +use std::path::PathBuf; use std::sync::Arc; /// An autonomous or semi-autonomous AI agent. @@ -58,6 +59,18 @@ pub struct Agent { pub(super) learning_enabled: bool, pub(super) event_session_id: String, pub(super) event_channel: String, + /// Human-readable agent definition name (e.g. `"main"`, + /// `"code_executor"`). Used as the `{agent}` component in session + /// transcript paths: `sessions/DDMMYYYY/{agent}_{index}.md`. + pub(super) agent_definition_name: String, + /// Resolved filesystem path for this session's transcript file. + /// Set on first write, reused for subsequent overwrites within the + /// same session. + pub(super) session_transcript_path: Option, + /// Messages loaded from a previous session transcript on resume. + /// Consumed once (via `.take()`) on the first turn to provide a + /// byte-identical prefix for KV cache reuse. + pub(super) cached_transcript_messages: Option>, /// Per-session [`ContextManager`] — owns the system-prompt /// builder, the layered reduction pipeline (tool-result budget → /// microcompact → autocompact signal → session-memory extraction @@ -93,6 +106,7 @@ pub struct AgentBuilder { pub(super) learning_enabled: bool, pub(super) event_session_id: Option, pub(super) event_channel: Option, + pub(super) agent_definition_name: Option, } impl Default for AgentBuilder { From be9755f04d47fa2eb3924ee41899b54b75916059 Mon Sep 17 00:00:00 2001 From: Steven Enamakel Date: Sat, 11 Apr 2026 23:47:01 -0700 Subject: [PATCH 2/6] feat(usage): enhance API response structure with usage and billing metadata - Added `usage` and `openhuman` fields to the `ApiChatResponse` struct to capture standard OpenAI usage metrics and OpenHuman backend metadata. - Introduced `ApiUsage`, `OpenHumanMeta`, and related structs to encapsulate detailed usage and billing information. - Updated `parse_native_response` and `extract_usage` methods to process and return usage data, improving the integration of usage tracking in chat responses. - Modified the `UsageInfo` struct in traits to include new fields for cached input tokens and charged amount, enhancing the overall usage reporting capabilities. --- src/openhuman/providers/compatible.rs | 120 ++++++++++++++++++++++++-- src/openhuman/providers/traits.rs | 8 ++ 2 files changed, 119 insertions(+), 9 deletions(-) diff --git a/src/openhuman/providers/compatible.rs b/src/openhuman/providers/compatible.rs index aa54bbe5e..4694ba9c4 100644 --- a/src/openhuman/providers/compatible.rs +++ b/src/openhuman/providers/compatible.rs @@ -5,6 +5,7 @@ use crate::openhuman::providers::traits::{ ChatMessage, ChatRequest as ProviderChatRequest, ChatResponse as ProviderChatResponse, Provider, StreamChunk, StreamError, StreamOptions, StreamResult, ToolCall as ProviderToolCall, + UsageInfo as ProviderUsageInfo, }; use async_trait::async_trait; use futures_util::{stream, StreamExt}; @@ -288,6 +289,12 @@ struct Message { #[derive(Debug, Deserialize)] struct ApiChatResponse { choices: Vec, + /// Standard OpenAI usage block. + #[serde(default)] + usage: Option, + /// OpenHuman backend metadata (usage + billing summary). + #[serde(default)] + openhuman: Option, } #[derive(Debug, Deserialize)] @@ -295,6 +302,52 @@ struct Choice { message: ResponseMessage, } +/// Standard OpenAI `usage` block on a chat completion response. +#[derive(Debug, Deserialize, Default)] +struct ApiUsage { + #[serde(default)] + prompt_tokens: u64, + #[serde(default)] + completion_tokens: u64, + #[serde(default)] + total_tokens: u64, + #[serde(default)] + prompt_tokens_details: Option, +} + +#[derive(Debug, Deserialize, Default)] +struct PromptTokensDetails { + #[serde(default)] + cached_tokens: u64, +} + +/// OpenHuman backend metadata appended to the response JSON. +#[derive(Debug, Deserialize, Default)] +struct OpenHumanMeta { + #[serde(default)] + usage: Option, + #[serde(default)] + billing: Option, +} + +#[derive(Debug, Deserialize, Default)] +struct OpenHumanUsage { + #[serde(default)] + input_tokens: u64, + #[serde(default)] + output_tokens: u64, + #[serde(default)] + total_tokens: u64, + #[serde(default)] + cached_input_tokens: u64, +} + +#[derive(Debug, Deserialize, Default)] +struct OpenHumanBilling { + #[serde(default)] + charged_amount_usd: f64, +} + /// Remove `...` blocks from model output. /// Some reasoning models (e.g. MiniMax) embed their chain-of-thought inline /// in the `content` field rather than a separate `reasoning_content` field. @@ -941,7 +994,20 @@ impl OpenAiCompatibleProvider { modified_messages } - fn parse_native_response(message: ResponseMessage) -> ProviderChatResponse { + fn parse_native_response(api_response: ApiChatResponse) -> ProviderChatResponse { + let usage = Self::extract_usage(&api_response); + + let message = match api_response.choices.into_iter().next() { + Some(choice) => choice.message, + None => { + return ProviderChatResponse { + text: None, + tool_calls: vec![], + usage, + }; + } + }; + let mut text = message.effective_content_optional(); let mut tool_calls = message .tool_calls @@ -990,10 +1056,49 @@ impl OpenAiCompatibleProvider { ProviderChatResponse { text, tool_calls, - usage: None, + usage, } } + /// Extract usage info from API response, preferring the OpenHuman + /// metadata block (which includes cache stats and billing) over the + /// standard OpenAI usage block. + fn extract_usage(resp: &ApiChatResponse) -> Option { + let oh = resp.openhuman.as_ref(); + let std_usage = resp.usage.as_ref(); + + // Need at least one source of token counts. + if oh.is_none() && std_usage.is_none() { + return None; + } + + let oh_usage = oh.and_then(|o| o.usage.as_ref()); + let oh_billing = oh.and_then(|o| o.billing.as_ref()); + + let input_tokens = oh_usage + .map(|u| u.input_tokens) + .or(std_usage.map(|u| u.prompt_tokens)) + .unwrap_or(0); + let output_tokens = oh_usage + .map(|u| u.output_tokens) + .or(std_usage.map(|u| u.completion_tokens)) + .unwrap_or(0); + let cached_input_tokens = oh_usage + .map(|u| u.cached_input_tokens) + .or(std_usage + .and_then(|u| u.prompt_tokens_details.as_ref()) + .map(|d| d.cached_tokens)) + .unwrap_or(0); + + Some(ProviderUsageInfo { + input_tokens, + output_tokens, + context_window: 0, + cached_input_tokens, + charged_amount_usd: oh_billing.map(|b| b.charged_amount_usd).unwrap_or(0.0), + }) + } + fn is_native_tool_schema_unsupported(status: reqwest::StatusCode, error: &str) -> bool { if !matches!( status, @@ -1462,14 +1567,11 @@ impl Provider for OpenAiCompatibleProvider { } let native_response: ApiChatResponse = response.json().await?; - let message = native_response - .choices - .into_iter() - .next() - .map(|choice| choice.message) - .ok_or_else(|| anyhow::anyhow!("No response from {}", self.name))?; + if native_response.choices.is_empty() { + anyhow::bail!("No response from {}", self.name); + } - Ok(Self::parse_native_response(message)) + Ok(Self::parse_native_response(native_response)) } fn supports_native_tools(&self) -> bool { diff --git a/src/openhuman/providers/traits.rs b/src/openhuman/providers/traits.rs index 76e8d68ac..c069804ce 100644 --- a/src/openhuman/providers/traits.rs +++ b/src/openhuman/providers/traits.rs @@ -58,6 +58,14 @@ pub struct UsageInfo { pub output_tokens: u64, /// Total context window size for the model (0 if unknown). pub context_window: u64, + /// Number of input tokens that were served from the KV cache + /// (returned by backends that support prompt caching, e.g. via + /// `openhuman.usage.cached_input_tokens` or + /// `prompt_tokens_details.cached_tokens`). + pub cached_input_tokens: u64, + /// Amount billed for this request in USD (from + /// `openhuman.billing.charged_amount_usd`). Zero when unavailable. + pub charged_amount_usd: f64, } /// An LLM response that may contain text, tool calls, or both. From 9186e2583f8c3d37a7c2f54223966d55e51169b0 Mon Sep 17 00:00:00 2001 From: Steven Enamakel Date: Sat, 11 Apr 2026 23:49:50 -0700 Subject: [PATCH 3/6] feat(provider): integrate usage extraction into OpenAiCompatibleProvider responses - Added usage extraction functionality to the `OpenAiCompatibleProvider`, enhancing the API response structure to include usage metrics. - Updated the `parse_native_response` method to utilize a new `wrap_message` function for improved testability and clarity in response handling. - Refactored tests to accommodate the new usage extraction logic, ensuring comprehensive coverage of the updated response structure. --- .../agent/harness/session/transcript.rs | 8 ++++++ src/openhuman/providers/compatible.rs | 26 +++++++++++++------ 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/src/openhuman/agent/harness/session/transcript.rs b/src/openhuman/agent/harness/session/transcript.rs index eeacbfe1d..e536b3734 100644 --- a/src/openhuman/agent/harness/session/transcript.rs +++ b/src/openhuman/agent/harness/session/transcript.rs @@ -56,6 +56,14 @@ pub struct TranscriptMeta { pub created: String, pub updated: String, pub turn_count: usize, + /// Cumulative input tokens across all provider calls this session. + pub input_tokens: u64, + /// Cumulative output tokens across all provider calls this session. + pub output_tokens: u64, + /// Cumulative input tokens served from the KV cache. + pub cached_input_tokens: u64, + /// Cumulative amount charged in USD. + pub charged_amount_usd: f64, } /// A parsed session transcript: metadata + exact message array. diff --git a/src/openhuman/providers/compatible.rs b/src/openhuman/providers/compatible.rs index 4694ba9c4..1e0e626e8 100644 --- a/src/openhuman/providers/compatible.rs +++ b/src/openhuman/providers/compatible.rs @@ -1434,6 +1434,7 @@ impl Provider for OpenAiCompatibleProvider { let body = response.text().await?; let chat_response = parse_chat_response_body(&self.name, &body)?; + let usage = Self::extract_usage(&chat_response); let choice = chat_response .choices .into_iter() @@ -1461,7 +1462,7 @@ impl Provider for OpenAiCompatibleProvider { Ok(ProviderChatResponse { text, tool_calls, - usage: None, + usage, }) } @@ -1709,6 +1710,15 @@ mod tests { OpenAiCompatibleProvider::new(name, url, key, AuthStyle::Bearer) } + /// Wrap a ResponseMessage in a minimal ApiChatResponse for tests. + fn wrap_message(message: ResponseMessage) -> ApiChatResponse { + ApiChatResponse { + choices: vec![Choice { message }], + usage: None, + openhuman: None, + } + } + #[test] fn creates_with_key() { let p = make_provider( @@ -2138,7 +2148,7 @@ mod tests { reasoning_content: None, }; - let parsed = OpenAiCompatibleProvider::parse_native_response(message); + let parsed = OpenAiCompatibleProvider::parse_native_response(wrap_message(message)); assert_eq!(parsed.tool_calls.len(), 1); assert_eq!(parsed.tool_calls[0].id, "call_123"); assert_eq!(parsed.tool_calls[0].name, "shell"); @@ -2367,7 +2377,7 @@ mod tests { Some(&serde_json::json!({"location":"London","unit":"c"})) ); - let parsed = OpenAiCompatibleProvider::parse_native_response(ResponseMessage { + let parsed = OpenAiCompatibleProvider::parse_native_response(wrap_message(ResponseMessage { content: None, reasoning_content: None, tool_calls: Some(vec![ToolCall { @@ -2379,7 +2389,7 @@ mod tests { }), }]), function_call: None, - }); + })); assert_eq!(parsed.tool_calls.len(), 1); assert_eq!(parsed.tool_calls[0].id, "call_456"); assert_eq!( @@ -2391,12 +2401,12 @@ mod tests { #[test] fn parse_native_response_recovers_tool_calls_from_json_content() { let content = r#"{"content":"Checking files...","tool_calls":[{"id":"call_json_1","function":{"name":"shell","arguments":"{\"command\":\"ls -la\"}"}}]}"#; - let parsed = OpenAiCompatibleProvider::parse_native_response(ResponseMessage { + let parsed = OpenAiCompatibleProvider::parse_native_response(wrap_message(ResponseMessage { content: Some(content.to_string()), reasoning_content: None, tool_calls: None, function_call: None, - }); + })); assert_eq!(parsed.text.as_deref(), Some("Checking files...")); assert_eq!(parsed.tool_calls.len(), 1); @@ -2407,7 +2417,7 @@ mod tests { #[test] fn parse_native_response_supports_legacy_function_call() { - let parsed = OpenAiCompatibleProvider::parse_native_response(ResponseMessage { + let parsed = OpenAiCompatibleProvider::parse_native_response(wrap_message(ResponseMessage { content: Some("Let me check".to_string()), reasoning_content: None, tool_calls: None, @@ -2417,7 +2427,7 @@ mod tests { r#"{"command":"pwd"}"#.to_string(), )), }), - }); + })); assert_eq!(parsed.tool_calls.len(), 1); assert_eq!(parsed.tool_calls[0].name, "shell"); From 4871a724d1f2d238a7e33bfb8a730d5cc9ddaa71 Mon Sep 17 00:00:00 2001 From: Steven Enamakel Date: Sat, 11 Apr 2026 23:55:01 -0700 Subject: [PATCH 4/6] feat(transcript): implement sub-agent transcript persistence and usage tracking - Added functionality to persist sub-agent conversation transcripts, enabling inspection for debugging and KV cache analysis. - Introduced a new `persist_subagent_transcript` function to handle transcript writing, including metadata for input/output tokens and charged amounts. - Updated the `Agent` implementation to accumulate usage statistics across iterations and include them in the session transcript. - Enhanced the `write_transcript` function to include additional metadata fields, improving the detail of stored transcripts. - Refactored tests to validate the new transcript persistence and usage tracking features, ensuring comprehensive coverage. --- .../agent/harness/session/transcript.rs | 26 +++++++ src/openhuman/agent/harness/session/turn.rs | 31 +++++++- .../agent/harness/subagent_runner.rs | 70 +++++++++++++++++++ src/openhuman/context/guard.rs | 3 + src/openhuman/context/manager.rs | 8 +++ src/openhuman/context/pipeline.rs | 4 ++ 6 files changed, 140 insertions(+), 2 deletions(-) diff --git a/src/openhuman/agent/harness/session/transcript.rs b/src/openhuman/agent/harness/session/transcript.rs index e536b3734..77e511a89 100644 --- a/src/openhuman/agent/harness/session/transcript.rs +++ b/src/openhuman/agent/harness/session/transcript.rs @@ -95,6 +95,18 @@ pub fn write_transcript(path: &Path, messages: &[ChatMessage], meta: &Transcript let _ = writeln!(buf, "created: {}", meta.created); let _ = writeln!(buf, "updated: {}", meta.updated); let _ = writeln!(buf, "turn_count: {}", meta.turn_count); + if meta.input_tokens > 0 || meta.output_tokens > 0 { + let _ = writeln!(buf, "input_tokens: {}", meta.input_tokens); + let _ = writeln!(buf, "output_tokens: {}", meta.output_tokens); + let _ = writeln!(buf, "cached_input_tokens: {}", meta.cached_input_tokens); + if meta.input_tokens > 0 { + let cache_pct = (meta.cached_input_tokens as f64 / meta.input_tokens as f64) * 100.0; + let _ = writeln!(buf, "cache_hit_pct: {:.1}%", cache_pct); + } + if meta.charged_amount_usd > 0.0 { + let _ = writeln!(buf, "charged_usd: ${:.6}", meta.charged_amount_usd); + } + } buf.push_str("-->\n"); // Messages @@ -225,6 +237,12 @@ fn parse_meta(raw: &str) -> Result { created: get("created").unwrap_or_default(), updated: get("updated").unwrap_or_default(), turn_count: get("turn_count").and_then(|s| s.parse().ok()).unwrap_or(0), + input_tokens: get("input_tokens").and_then(|s| s.parse().ok()).unwrap_or(0), + output_tokens: get("output_tokens").and_then(|s| s.parse().ok()).unwrap_or(0), + cached_input_tokens: get("cached_input_tokens").and_then(|s| s.parse().ok()).unwrap_or(0), + charged_amount_usd: get("charged_usd") + .and_then(|s| s.trim_start_matches('$').parse().ok()) + .unwrap_or(0.0), }) } @@ -357,6 +375,10 @@ mod tests { created: "2026-04-11T14:30:00Z".into(), updated: "2026-04-11T14:35:22Z".into(), turn_count: 3, + input_tokens: 5000, + output_tokens: 1200, + cached_input_tokens: 3500, + charged_amount_usd: 0.0045, } } @@ -417,6 +439,10 @@ mod tests { assert_eq!(loaded.meta.created, "2026-04-11T14:30:00Z"); assert_eq!(loaded.meta.updated, "2026-04-11T14:35:22Z"); assert_eq!(loaded.meta.turn_count, 3); + assert_eq!(loaded.meta.input_tokens, 5000); + assert_eq!(loaded.meta.output_tokens, 1200); + assert_eq!(loaded.meta.cached_input_tokens, 3500); + assert!((loaded.meta.charged_amount_usd - 0.0045).abs() < 1e-8); } #[test] diff --git a/src/openhuman/agent/harness/session/turn.rs b/src/openhuman/agent/harness/session/turn.rs index edb0b41d6..feafd9913 100644 --- a/src/openhuman/agent/harness/session/turn.rs +++ b/src/openhuman/agent/harness/session/turn.rs @@ -158,6 +158,12 @@ impl Agent { // can persist it as a session transcript after the turn completes. let mut last_provider_messages: Option> = None; + // Accumulate usage stats across iterations for the transcript. + let mut cumulative_input_tokens: u64 = 0; + let mut cumulative_output_tokens: u64 = 0; + let mut cumulative_cached_input_tokens: u64 = 0; + let mut cumulative_charged_usd: f64 = 0.0; + let turn_body = async { for iteration in 0..self.config.max_tool_iterations { log::info!( @@ -296,6 +302,10 @@ impl Agent { // the provider doesn't return usage. if let Some(ref usage) = resp.usage { self.context.record_usage(usage); + cumulative_input_tokens += usage.input_tokens; + cumulative_output_tokens += usage.output_tokens; + cumulative_cached_input_tokens += usage.cached_input_tokens; + cumulative_charged_usd += usage.charged_amount_usd; } resp } @@ -474,7 +484,13 @@ impl Agent { // resume with a byte-identical prefix for KV cache reuse. if result.is_ok() { if let Some(ref messages) = last_provider_messages { - self.persist_session_transcript(messages); + self.persist_session_transcript( + messages, + cumulative_input_tokens, + cumulative_output_tokens, + cumulative_cached_input_tokens, + cumulative_charged_usd, + ); } } @@ -876,7 +892,14 @@ impl Agent { /// Best-effort: failures are logged and silently ignored. The JSONL /// conversation store remains the authoritative persistence layer; /// session transcripts are an optimization for KV cache stability. - pub(super) fn persist_session_transcript(&mut self, messages: &[ChatMessage]) { + pub(super) fn persist_session_transcript( + &mut self, + messages: &[ChatMessage], + input_tokens: u64, + output_tokens: u64, + cached_input_tokens: u64, + charged_amount_usd: f64, + ) { // Resolve the transcript path on first write. if self.session_transcript_path.is_none() { match transcript::resolve_new_transcript_path( @@ -911,6 +934,10 @@ impl Agent { created: now.clone(), updated: now, turn_count: self.context.stats().session_memory_current_turn as usize, + input_tokens, + output_tokens, + cached_input_tokens, + charged_amount_usd, }; if let Err(err) = transcript::write_transcript(path, messages, &meta) { diff --git a/src/openhuman/agent/harness/subagent_runner.rs b/src/openhuman/agent/harness/subagent_runner.rs index 3268a034f..8eb65808f 100644 --- a/src/openhuman/agent/harness/subagent_runner.rs +++ b/src/openhuman/agent/harness/subagent_runner.rs @@ -30,12 +30,14 @@ use super::definition::{AgentDefinition, PromptSource, ToolScope}; use super::fork_context::{current_fork, current_parent, ForkContext, ParentExecutionContext}; +use super::session::transcript; use crate::openhuman::context::prompt::{ extract_cache_boundary, render_subagent_system_prompt, SubagentRenderOptions, }; use crate::openhuman::providers::{ChatMessage, ChatRequest, Provider, ToolCall}; use crate::openhuman::tools::{Tool, ToolCategory, ToolSpec}; use std::collections::HashSet; +use std::path::Path; use std::time::{Duration, Instant}; use thiserror::Error; @@ -297,6 +299,13 @@ async fn run_typed_mode( ) .await?; + persist_subagent_transcript( + &parent.workspace_dir, + &definition.id, + &history, + system_prompt_cache_boundary, + ); + Ok(SubagentRunOutcome { task_id: task_id.to_string(), agent_id: definition.id.clone(), @@ -379,6 +388,13 @@ async fn run_fork_mode( ) .await?; + persist_subagent_transcript( + &parent.workspace_dir, + &definition.id, + &history, + fork.cache_boundary, + ); + Ok(SubagentRunOutcome { task_id: task_id.to_string(), agent_id: definition.id.clone(), @@ -389,6 +405,60 @@ async fn run_fork_mode( }) } +// ───────────────────────────────────────────────────────────────────────────── +// Session transcript persistence for sub-agents +// ───────────────────────────────────────────────────────────────────────────── + +/// Best-effort: persist the sub-agent's conversation as a session transcript +/// so it can be inspected for debugging and KV cache analysis. +fn persist_subagent_transcript( + workspace_dir: &Path, + agent_id: &str, + history: &[ChatMessage], + cache_boundary: Option, +) { + let path = match transcript::resolve_new_transcript_path(workspace_dir, agent_id) { + Ok(p) => p, + Err(err) => { + tracing::debug!( + agent_id = %agent_id, + error = %err, + "[subagent_runner] failed to resolve transcript path" + ); + return; + } + }; + + let now = chrono::Utc::now().to_rfc3339(); + let meta = transcript::TranscriptMeta { + agent_name: agent_id.to_string(), + dispatcher: "native".into(), + cache_boundary, + created: now.clone(), + updated: now, + turn_count: 1, + input_tokens: 0, + output_tokens: 0, + cached_input_tokens: 0, + charged_amount_usd: 0.0, + }; + + if let Err(err) = transcript::write_transcript(&path, history, &meta) { + tracing::debug!( + agent_id = %agent_id, + error = %err, + "[subagent_runner] failed to write transcript" + ); + } else { + tracing::debug!( + agent_id = %agent_id, + messages = history.len(), + path = %path.display(), + "[subagent_runner] transcript written" + ); + } +} + // ───────────────────────────────────────────────────────────────────────────── // Inner tool-call loop (slim version of agent::loop_::tool_loop) // ───────────────────────────────────────────────────────────────────────────── diff --git a/src/openhuman/context/guard.rs b/src/openhuman/context/guard.rs index c809ef786..d23a8d82e 100644 --- a/src/openhuman/context/guard.rs +++ b/src/openhuman/context/guard.rs @@ -180,6 +180,7 @@ mod tests { input_tokens: 10_000, output_tokens: 5_000, context_window: 100_000, + ..Default::default() }); assert_eq!(guard.check(), ContextCheckResult::Ok); } @@ -191,6 +192,7 @@ mod tests { input_tokens: 85_000, output_tokens: 6_000, context_window: 100_000, + ..Default::default() }); assert_eq!(guard.check(), ContextCheckResult::CompactionNeeded); } @@ -202,6 +204,7 @@ mod tests { input_tokens: 90_000, output_tokens: 6_000, context_window: 100_000, + ..Default::default() }); guard.record_compaction_failure(); diff --git a/src/openhuman/context/manager.rs b/src/openhuman/context/manager.rs index 2c97a83d4..f68d50350 100644 --- a/src/openhuman/context/manager.rs +++ b/src/openhuman/context/manager.rs @@ -461,6 +461,7 @@ mod tests { input_tokens: 5_000, output_tokens: 500, context_window: 100_000, + ..Default::default() }); let mut history = vec![user("hi")]; @@ -480,6 +481,7 @@ mod tests { input_tokens: 92_000, output_tokens: 4_000, context_window: 100_000, + ..Default::default() }); // Build a history with several older tool-result envelopes @@ -528,6 +530,7 @@ mod tests { input_tokens: 92_000, output_tokens: 4_000, context_window: 100_000, + ..Default::default() }); // History with no old tool-result envelopes — microcompact @@ -561,6 +564,7 @@ mod tests { input_tokens: 92_000, output_tokens: 4_000, context_window: 100_000, + ..Default::default() }); // Try three times — each call sends the pipeline into @@ -583,6 +587,7 @@ mod tests { input_tokens: 96_000, output_tokens: 2_000, context_window: 100_000, + ..Default::default() }); let mut history = vec![user("x")]; let outcome = manager.reduce_before_call(&mut history).await.unwrap(); @@ -608,6 +613,7 @@ mod tests { input_tokens: 92_000, output_tokens: 4_000, context_window: 100_000, + ..Default::default() }); // No old tool-result envelopes — microcompact cannot free @@ -645,6 +651,7 @@ mod tests { input_tokens: 96_000, output_tokens: 2_000, context_window: 100_000, + ..Default::default() }); let mut history = vec![user("a"), user("b"), user("c")]; @@ -661,6 +668,7 @@ mod tests { input_tokens: 10_000, output_tokens: 2_000, context_window: 100_000, + ..Default::default() }); manager.tick_turn(); manager.record_tool_calls(3); diff --git a/src/openhuman/context/pipeline.rs b/src/openhuman/context/pipeline.rs index e3225f97e..6a3eb7d20 100644 --- a/src/openhuman/context/pipeline.rs +++ b/src/openhuman/context/pipeline.rs @@ -288,6 +288,7 @@ mod tests { input_tokens: 92_000, output_tokens: 4_000, context_window: 100_000, + ..Default::default() }); } @@ -298,6 +299,7 @@ mod tests { input_tokens: 10_000, output_tokens: 1_000, context_window: 100_000, + ..Default::default() }); let mut history = vec![ user("hi"), @@ -412,6 +414,7 @@ mod tests { input_tokens: 96_000, output_tokens: 2_000, context_window: 100_000, + ..Default::default() }); // Trip the circuit breaker. pipeline.guard.record_compaction_failure(); @@ -430,6 +433,7 @@ mod tests { input_tokens: 10_000, output_tokens: 2_000, context_window: 100_000, + ..Default::default() }); assert_eq!(pipeline.session_memory_snapshot().total_tokens, 12_000); } From 3b1ed77976009e9027aed4771b45107f95fba464 Mon Sep 17 00:00:00 2001 From: Steven Enamakel Date: Sun, 12 Apr 2026 00:07:24 -0700 Subject: [PATCH 5/6] refactor(transcript): improve code readability and structure - Reformatted the `write_transcript` and `read_transcript` functions for better clarity by adjusting line breaks and indentation. - Enhanced the `parse_meta` function to improve readability by restructuring the `get` closure. - Updated test cases to maintain consistency with the new formatting, ensuring clarity in the test structure. - Overall, these changes aim to enhance code maintainability and readability without altering functionality. --- .../agent/harness/session/transcript.rs | 60 +++++++++++------- src/openhuman/agent/harness/session/turn.rs | 9 ++- src/openhuman/providers/compatible.rs | 63 ++++++++++--------- 3 files changed, 76 insertions(+), 56 deletions(-) diff --git a/src/openhuman/agent/harness/session/transcript.rs b/src/openhuman/agent/harness/session/transcript.rs index 77e511a89..7b6cb42f4 100644 --- a/src/openhuman/agent/harness/session/transcript.rs +++ b/src/openhuman/agent/harness/session/transcript.rs @@ -77,7 +77,11 @@ pub struct SessionTranscript { /// Write a session transcript to `path`. Full rewrite (not append) /// because context reduction may have removed earlier messages. -pub fn write_transcript(path: &Path, messages: &[ChatMessage], meta: &TranscriptMeta) -> Result<()> { +pub fn write_transcript( + path: &Path, + messages: &[ChatMessage], + meta: &TranscriptMeta, +) -> Result<()> { if let Some(parent) = path.parent() { fs::create_dir_all(parent) .with_context(|| format!("create transcript dir {}", parent.display()))?; @@ -135,11 +139,11 @@ pub fn write_transcript(path: &Path, messages: &[ChatMessage], meta: &Transcript /// Read a session transcript from `path` and return the exact messages. pub fn read_transcript(path: &Path) -> Result { - let raw = fs::read_to_string(path) - .with_context(|| format!("read transcript {}", path.display()))?; + let raw = + fs::read_to_string(path).with_context(|| format!("read transcript {}", path.display()))?; - let meta = parse_meta(&raw) - .with_context(|| format!("parse transcript meta in {}", path.display()))?; + let meta = + parse_meta(&raw).with_context(|| format!("parse transcript meta in {}", path.display()))?; let messages = parse_messages(&raw) .with_context(|| format!("parse transcript messages in {}", path.display()))?; @@ -218,16 +222,14 @@ fn parse_meta(raw: &str) -> Result { let header = &raw[header_start..header_start + header_end + 3]; let get = |key: &str| -> Option { - header - .lines() - .find_map(|line| { - let line = line.trim(); - if line.starts_with(&format!("{key}:")) { - Some(line[key.len() + 1..].trim().to_string()) - } else { - None - } - }) + header.lines().find_map(|line| { + let line = line.trim(); + if line.starts_with(&format!("{key}:")) { + Some(line[key.len() + 1..].trim().to_string()) + } else { + None + } + }) }; Ok(TranscriptMeta { @@ -237,9 +239,15 @@ fn parse_meta(raw: &str) -> Result { created: get("created").unwrap_or_default(), updated: get("updated").unwrap_or_default(), turn_count: get("turn_count").and_then(|s| s.parse().ok()).unwrap_or(0), - input_tokens: get("input_tokens").and_then(|s| s.parse().ok()).unwrap_or(0), - output_tokens: get("output_tokens").and_then(|s| s.parse().ok()).unwrap_or(0), - cached_input_tokens: get("cached_input_tokens").and_then(|s| s.parse().ok()).unwrap_or(0), + input_tokens: get("input_tokens") + .and_then(|s| s.parse().ok()) + .unwrap_or(0), + output_tokens: get("output_tokens") + .and_then(|s| s.parse().ok()) + .unwrap_or(0), + cached_input_tokens: get("cached_input_tokens") + .and_then(|s| s.parse().ok()) + .unwrap_or(0), charged_amount_usd: get("charged_usd") .and_then(|s| s.trim_start_matches('$').parse().ok()) .unwrap_or(0.0), @@ -307,7 +315,13 @@ fn today_session_dir(workspace_dir: &Path) -> PathBuf { fn sanitize_agent_name(name: &str) -> String { name.chars() - .map(|c| if c.is_alphanumeric() || c == '-' || c == '_' { c } else { '_' }) + .map(|c| { + if c.is_alphanumeric() || c == '-' || c == '_' { + c + } else { + '_' + } + }) .collect() } @@ -359,7 +373,9 @@ mod tests { fn sample_messages() -> Vec { vec![ - ChatMessage::system("You are a helpful assistant.\n\n## Tools\n\n- **shell**: Run commands"), + ChatMessage::system( + "You are a helpful assistant.\n\n## Tools\n\n- **shell**: Run commands", + ), ChatMessage::user("What files are in /tmp?"), ChatMessage::assistant("Let me check that for you."), ChatMessage::tool("{\"tool_call_id\":\"tc1\",\"content\":\"file1.txt\\nfile2.txt\"}"), @@ -409,9 +425,7 @@ mod tests { let path = dir.path().join("escape_test.md"); let messages = vec![ ChatMessage::system("Normal system prompt"), - ChatMessage::user( - "Here is some tricky content:\n\nand more after", - ), + ChatMessage::user("Here is some tricky content:\n\nand more after"), ChatMessage::assistant("Got it, that had a in it."), ]; let meta = sample_meta(); diff --git a/src/openhuman/agent/harness/session/turn.rs b/src/openhuman/agent/harness/session/turn.rs index feafd9913..df71a1cd3 100644 --- a/src/openhuman/agent/harness/session/turn.rs +++ b/src/openhuman/agent/harness/session/turn.rs @@ -244,8 +244,9 @@ impl Agent { let messages = if let Some(mut cached) = self.cached_transcript_messages.take() { // Append only the delta (new user message) from the // end of the current history. - let new_tail = self.tool_dispatcher - .to_provider_messages(&self.history[self.history.len().saturating_sub(1)..]); + let new_tail = self.tool_dispatcher.to_provider_messages( + &self.history[self.history.len().saturating_sub(1)..], + ); cached.extend(new_tail); log::info!( "[transcript] resumed from cached transcript prefix_len={} new_tail={}", @@ -856,7 +857,9 @@ impl Agent { match transcript::read_transcript(&path) { Ok(session) => { if session.messages.is_empty() { - log::debug!("[transcript] previous transcript is empty — skipping resume"); + log::debug!( + "[transcript] previous transcript is empty — skipping resume" + ); return; } // Restore the cache boundary from the transcript diff --git a/src/openhuman/providers/compatible.rs b/src/openhuman/providers/compatible.rs index 1e0e626e8..a78251fbf 100644 --- a/src/openhuman/providers/compatible.rs +++ b/src/openhuman/providers/compatible.rs @@ -2377,19 +2377,20 @@ mod tests { Some(&serde_json::json!({"location":"London","unit":"c"})) ); - let parsed = OpenAiCompatibleProvider::parse_native_response(wrap_message(ResponseMessage { - content: None, - reasoning_content: None, - tool_calls: Some(vec![ToolCall { - id: Some("call_456".to_string()), - kind: Some("function".to_string()), - function: Some(Function { - name: Some("get_weather".to_string()), - arguments: Some(serde_json::json!({"location":"London","unit":"c"})), - }), - }]), - function_call: None, - })); + let parsed = + OpenAiCompatibleProvider::parse_native_response(wrap_message(ResponseMessage { + content: None, + reasoning_content: None, + tool_calls: Some(vec![ToolCall { + id: Some("call_456".to_string()), + kind: Some("function".to_string()), + function: Some(Function { + name: Some("get_weather".to_string()), + arguments: Some(serde_json::json!({"location":"London","unit":"c"})), + }), + }]), + function_call: None, + })); assert_eq!(parsed.tool_calls.len(), 1); assert_eq!(parsed.tool_calls[0].id, "call_456"); assert_eq!( @@ -2401,12 +2402,13 @@ mod tests { #[test] fn parse_native_response_recovers_tool_calls_from_json_content() { let content = r#"{"content":"Checking files...","tool_calls":[{"id":"call_json_1","function":{"name":"shell","arguments":"{\"command\":\"ls -la\"}"}}]}"#; - let parsed = OpenAiCompatibleProvider::parse_native_response(wrap_message(ResponseMessage { - content: Some(content.to_string()), - reasoning_content: None, - tool_calls: None, - function_call: None, - })); + let parsed = + OpenAiCompatibleProvider::parse_native_response(wrap_message(ResponseMessage { + content: Some(content.to_string()), + reasoning_content: None, + tool_calls: None, + function_call: None, + })); assert_eq!(parsed.text.as_deref(), Some("Checking files...")); assert_eq!(parsed.tool_calls.len(), 1); @@ -2417,17 +2419,18 @@ mod tests { #[test] fn parse_native_response_supports_legacy_function_call() { - let parsed = OpenAiCompatibleProvider::parse_native_response(wrap_message(ResponseMessage { - content: Some("Let me check".to_string()), - reasoning_content: None, - tool_calls: None, - function_call: Some(Function { - name: Some("shell".to_string()), - arguments: Some(serde_json::Value::String( - r#"{"command":"pwd"}"#.to_string(), - )), - }), - })); + let parsed = + OpenAiCompatibleProvider::parse_native_response(wrap_message(ResponseMessage { + content: Some("Let me check".to_string()), + reasoning_content: None, + tool_calls: None, + function_call: Some(Function { + name: Some("shell".to_string()), + arguments: Some(serde_json::Value::String( + r#"{"command":"pwd"}"#.to_string(), + )), + }), + })); assert_eq!(parsed.tool_calls.len(), 1); assert_eq!(parsed.tool_calls[0].name, "shell"); From 1011fa6810f873a894df79fa68e0b1fce6165f9d Mon Sep 17 00:00:00 2001 From: Steven Enamakel Date: Sun, 12 Apr 2026 00:32:22 -0700 Subject: [PATCH 6/6] fix(provider): return Result from parse_native_response, use Option for OpenHumanUsage fields - parse_native_response now returns Result and propagates an error when choices is empty instead of silently returning text: None - OpenHumanUsage token fields changed to Option so missing keys yield None and allow fallback to standard OpenAI usage block - extract_usage now logs which source (openhuman vs standard) provided token counts via structured tracing - run_inner_loop returns AggregatedUsage so subagent transcripts reflect real usage instead of zeros - Document escape_content/unescape_content edge case with pre-existing escape sequences --- .../agent/harness/session/transcript.rs | 11 +++ .../agent/harness/subagent_runner.rs | 37 +++++-- src/openhuman/providers/compatible.rs | 99 +++++++++++-------- 3 files changed, 99 insertions(+), 48 deletions(-) diff --git a/src/openhuman/agent/harness/session/transcript.rs b/src/openhuman/agent/harness/session/transcript.rs index 7b6cb42f4..d4c4fb5de 100644 --- a/src/openhuman/agent/harness/session/transcript.rs +++ b/src/openhuman/agent/harness/session/transcript.rs @@ -204,10 +204,21 @@ pub fn find_latest_transcript(workspace_dir: &Path, agent_name: &str) -> Option< // ── Internals ──────────────────────────────────────────────────────── +/// Escape the closing delimiter so it cannot appear literally in message +/// content. Replaces `` with ``. +/// +/// **Known edge case:** if the original content already contains the literal +/// escape sequence ``, `unescape_content` will convert it into +/// ``, corrupting the round-trip. In practice this sequence is +/// vanishingly unlikely in LLM output. A fully reversible fix would require +/// a two-pass escaping scheme (e.g. also escaping the backslash), but that +/// added complexity is not warranted unless the edge case materialises. fn escape_content(content: &str) -> String { content.replace(MSG_CLOSE, MSG_CLOSE_ESCAPED) } +/// Reverse the escaping applied by [`escape_content`]. See that function's +/// doc comment for the known edge case with pre-existing escape sequences. fn unescape_content(content: &str) -> String { content.replace(MSG_CLOSE_ESCAPED, MSG_CLOSE) } diff --git a/src/openhuman/agent/harness/subagent_runner.rs b/src/openhuman/agent/harness/subagent_runner.rs index 8eb65808f..33c18418b 100644 --- a/src/openhuman/agent/harness/subagent_runner.rs +++ b/src/openhuman/agent/harness/subagent_runner.rs @@ -284,7 +284,7 @@ async fn run_typed_mode( ]; // ── Run the inner tool-call loop ─────────────────────────────────── - let (output, iterations) = run_inner_loop( + let (output, iterations, agg_usage) = run_inner_loop( parent.provider.as_ref(), &mut history, &parent.all_tools, @@ -304,6 +304,7 @@ async fn run_typed_mode( &definition.id, &history, system_prompt_cache_boundary, + &agg_usage, ); Ok(SubagentRunOutcome { @@ -373,7 +374,7 @@ async fn run_fork_mode( // Use the parent's iteration cap, not the synthetic fork definition's. let max_iterations = parent.agent_config.max_tool_iterations.max(1); - let (output, iterations) = run_inner_loop( + let (output, iterations, agg_usage) = run_inner_loop( parent.provider.as_ref(), &mut history, &parent.all_tools, @@ -393,6 +394,7 @@ async fn run_fork_mode( &definition.id, &history, fork.cache_boundary, + &agg_usage, ); Ok(SubagentRunOutcome { @@ -416,6 +418,7 @@ fn persist_subagent_transcript( agent_id: &str, history: &[ChatMessage], cache_boundary: Option, + usage: &AggregatedUsage, ) { let path = match transcript::resolve_new_transcript_path(workspace_dir, agent_id) { Ok(p) => p, @@ -437,10 +440,10 @@ fn persist_subagent_transcript( created: now.clone(), updated: now, turn_count: 1, - input_tokens: 0, - output_tokens: 0, - cached_input_tokens: 0, - charged_amount_usd: 0.0, + input_tokens: usage.input_tokens, + output_tokens: usage.output_tokens, + cached_input_tokens: usage.cached_input_tokens, + charged_amount_usd: usage.charged_amount_usd, }; if let Err(err) = transcript::write_transcript(&path, history, &meta) { @@ -463,6 +466,15 @@ fn persist_subagent_transcript( // Inner tool-call loop (slim version of agent::loop_::tool_loop) // ───────────────────────────────────────────────────────────────────────────── +/// Cumulative usage stats gathered across all provider calls in the loop. +#[derive(Debug, Clone, Default)] +struct AggregatedUsage { + input_tokens: u64, + output_tokens: u64, + cached_input_tokens: u64, + charged_amount_usd: f64, +} + #[allow(clippy::too_many_arguments)] async fn run_inner_loop( provider: &dyn Provider, @@ -476,7 +488,7 @@ async fn run_inner_loop( system_prompt_cache_boundary: Option, task_id: &str, agent_id: &str, -) -> Result<(String, usize), SubagentRunError> { +) -> Result<(String, usize, AggregatedUsage), SubagentRunError> { let max_iterations = max_iterations.max(1); let supports_native = provider.supports_native_tools() && !tool_specs.is_empty(); let request_tools = if supports_native { @@ -485,6 +497,8 @@ async fn run_inner_loop( None }; + let mut usage = AggregatedUsage::default(); + for iteration in 0..max_iterations { tracing::debug!( task_id = %task_id, @@ -506,6 +520,13 @@ async fn run_inner_loop( ) .await?; + if let Some(ref u) = resp.usage { + usage.input_tokens += u.input_tokens; + usage.output_tokens += u.output_tokens; + usage.cached_input_tokens += u.cached_input_tokens; + usage.charged_amount_usd += u.charged_amount_usd; + } + let response_text = resp.text.clone().unwrap_or_default(); let native_calls: Vec = resp.tool_calls.clone(); @@ -518,7 +539,7 @@ async fn run_inner_loop( "[subagent_runner] no tool calls — returning final response" ); history.push(ChatMessage::assistant(response_text.clone())); - return Ok((response_text, iteration + 1)); + return Ok((response_text, iteration + 1, usage)); } // Persist assistant message with the original tool_calls payload so diff --git a/src/openhuman/providers/compatible.rs b/src/openhuman/providers/compatible.rs index a78251fbf..5a7cde161 100644 --- a/src/openhuman/providers/compatible.rs +++ b/src/openhuman/providers/compatible.rs @@ -332,14 +332,11 @@ struct OpenHumanMeta { #[derive(Debug, Deserialize, Default)] struct OpenHumanUsage { - #[serde(default)] - input_tokens: u64, - #[serde(default)] - output_tokens: u64, - #[serde(default)] - total_tokens: u64, - #[serde(default)] - cached_input_tokens: u64, + input_tokens: Option, + output_tokens: Option, + #[allow(dead_code)] + total_tokens: Option, + cached_input_tokens: Option, } #[derive(Debug, Deserialize, Default)] @@ -994,19 +991,18 @@ impl OpenAiCompatibleProvider { modified_messages } - fn parse_native_response(api_response: ApiChatResponse) -> ProviderChatResponse { + fn parse_native_response( + api_response: ApiChatResponse, + provider_name: &str, + ) -> anyhow::Result { let usage = Self::extract_usage(&api_response); - let message = match api_response.choices.into_iter().next() { - Some(choice) => choice.message, - None => { - return ProviderChatResponse { - text: None, - tool_calls: vec![], - usage, - }; - } - }; + let message = api_response + .choices + .into_iter() + .next() + .map(|c| c.message) + .ok_or_else(|| anyhow::anyhow!("No choices in response from {}", provider_name))?; let mut text = message.effective_content_optional(); let mut tool_calls = message @@ -1053,11 +1049,11 @@ impl OpenAiCompatibleProvider { } } - ProviderChatResponse { + Ok(ProviderChatResponse { text, tool_calls, usage, - } + }) } /// Extract usage info from API response, preferring the OpenHuman @@ -1075,27 +1071,44 @@ impl OpenAiCompatibleProvider { let oh_usage = oh.and_then(|o| o.usage.as_ref()); let oh_billing = oh.and_then(|o| o.billing.as_ref()); + // Prefer OpenHuman metadata when the fields are actually present; + // fall back to the standard OpenAI usage block when they are None. let input_tokens = oh_usage - .map(|u| u.input_tokens) + .and_then(|u| u.input_tokens) .or(std_usage.map(|u| u.prompt_tokens)) .unwrap_or(0); let output_tokens = oh_usage - .map(|u| u.output_tokens) + .and_then(|u| u.output_tokens) .or(std_usage.map(|u| u.completion_tokens)) .unwrap_or(0); let cached_input_tokens = oh_usage - .map(|u| u.cached_input_tokens) + .and_then(|u| u.cached_input_tokens) .or(std_usage .and_then(|u| u.prompt_tokens_details.as_ref()) .map(|d| d.cached_tokens)) .unwrap_or(0); + let charged_amount_usd = oh_billing.map(|b| b.charged_amount_usd).unwrap_or(0.0); + + let from_openhuman = oh_usage.is_some(); + let from_standard = std_usage.is_some() && !from_openhuman; + let has_billing = oh_billing.is_some(); + tracing::debug!( + from_openhuman, + from_standard, + has_billing, + input_tokens, + output_tokens, + cached_input_tokens, + charged_amount_usd, + "[provider:usage] extract_usage resolved token counts" + ); Some(ProviderUsageInfo { input_tokens, output_tokens, context_window: 0, cached_input_tokens, - charged_amount_usd: oh_billing.map(|b| b.charged_amount_usd).unwrap_or(0.0), + charged_amount_usd, }) } @@ -1568,11 +1581,7 @@ impl Provider for OpenAiCompatibleProvider { } let native_response: ApiChatResponse = response.json().await?; - if native_response.choices.is_empty() { - anyhow::bail!("No response from {}", self.name); - } - - Ok(Self::parse_native_response(native_response)) + Self::parse_native_response(native_response, &self.name) } fn supports_native_tools(&self) -> bool { @@ -2148,7 +2157,8 @@ mod tests { reasoning_content: None, }; - let parsed = OpenAiCompatibleProvider::parse_native_response(wrap_message(message)); + let parsed = + OpenAiCompatibleProvider::parse_native_response(wrap_message(message), "test").unwrap(); assert_eq!(parsed.tool_calls.len(), 1); assert_eq!(parsed.tool_calls[0].id, "call_123"); assert_eq!(parsed.tool_calls[0].name, "shell"); @@ -2377,8 +2387,8 @@ mod tests { Some(&serde_json::json!({"location":"London","unit":"c"})) ); - let parsed = - OpenAiCompatibleProvider::parse_native_response(wrap_message(ResponseMessage { + let parsed = OpenAiCompatibleProvider::parse_native_response( + wrap_message(ResponseMessage { content: None, reasoning_content: None, tool_calls: Some(vec![ToolCall { @@ -2390,7 +2400,10 @@ mod tests { }), }]), function_call: None, - })); + }), + "test", + ) + .unwrap(); assert_eq!(parsed.tool_calls.len(), 1); assert_eq!(parsed.tool_calls[0].id, "call_456"); assert_eq!( @@ -2402,13 +2415,16 @@ mod tests { #[test] fn parse_native_response_recovers_tool_calls_from_json_content() { let content = r#"{"content":"Checking files...","tool_calls":[{"id":"call_json_1","function":{"name":"shell","arguments":"{\"command\":\"ls -la\"}"}}]}"#; - let parsed = - OpenAiCompatibleProvider::parse_native_response(wrap_message(ResponseMessage { + let parsed = OpenAiCompatibleProvider::parse_native_response( + wrap_message(ResponseMessage { content: Some(content.to_string()), reasoning_content: None, tool_calls: None, function_call: None, - })); + }), + "test", + ) + .unwrap(); assert_eq!(parsed.text.as_deref(), Some("Checking files...")); assert_eq!(parsed.tool_calls.len(), 1); @@ -2419,8 +2435,8 @@ mod tests { #[test] fn parse_native_response_supports_legacy_function_call() { - let parsed = - OpenAiCompatibleProvider::parse_native_response(wrap_message(ResponseMessage { + let parsed = OpenAiCompatibleProvider::parse_native_response( + wrap_message(ResponseMessage { content: Some("Let me check".to_string()), reasoning_content: None, tool_calls: None, @@ -2430,7 +2446,10 @@ mod tests { r#"{"command":"pwd"}"#.to_string(), )), }), - })); + }), + "test", + ) + .unwrap(); assert_eq!(parsed.tool_calls.len(), 1); assert_eq!(parsed.tool_calls[0].name, "shell");