From eda8e469b770167292827f8b2a21706b86f8e91c Mon Sep 17 00:00:00 2001 From: Chris Busillo Date: Fri, 3 Apr 2026 09:24:23 -0400 Subject: [PATCH 1/2] Add thread/resume compatibility shim --- code-rs/Cargo.lock | 1 + code-rs/app-server/Cargo.toml | 1 + .../app-server/src/code_message_processor.rs | 272 ++++++++++++++++++ code-rs/app-server/src/message_processor.rs | 22 ++ 4 files changed, 296 insertions(+) diff --git a/code-rs/Cargo.lock b/code-rs/Cargo.lock index 61de029d383..20637765f69 100644 --- a/code-rs/Cargo.lock +++ b/code-rs/Cargo.lock @@ -897,6 +897,7 @@ version = "0.0.0" dependencies = [ "anyhow", "assert_cmd", + "chrono", "clap", "code-app-server-protocol", "code-arg0", diff --git a/code-rs/app-server/Cargo.toml b/code-rs/app-server/Cargo.toml index 13e6e272f09..56f0a6c5c67 100644 --- a/code-rs/app-server/Cargo.toml +++ b/code-rs/app-server/Cargo.toml @@ -26,6 +26,7 @@ code-app-server-protocol = { workspace = true } code-protocol = { workspace = true } code-utils-absolute-path = { workspace = true } code-utils-json-to-toml = { workspace = true } +chrono = { workspace = true } futures = { workspace = true } # We should only be using mcp-types for JSON-RPC types: it would be nice to # split this out into a separate crate at some point. diff --git a/code-rs/app-server/src/code_message_processor.rs b/code-rs/app-server/src/code_message_processor.rs index 8737ef21527..45dd2c9be11 100644 --- a/code-rs/app-server/src/code_message_processor.rs +++ b/code-rs/app-server/src/code_message_processor.rs @@ -12,6 +12,9 @@ use code_app_server_protocol::GetAccountResponse; use code_app_server_protocol::LoginAccountParams; use code_app_server_protocol::LoginAccountResponse; use code_app_server_protocol::LogoutAccountResponse; +use code_app_server_protocol::Thread; +use code_app_server_protocol::ThreadResumeParams; +use code_app_server_protocol::ThreadResumeResponse; use code_app_server_protocol::ToolRequestUserInputOption; use code_app_server_protocol::ToolRequestUserInputParams; use code_app_server_protocol::ToolRequestUserInputQuestion; @@ -21,6 +24,7 @@ use code_core::CodexConversation; use code_core::ConversationManager; use code_core::NewConversation; use code_core::RolloutRecorder; +use code_core::SessionCatalog; use code_core::Cursor; use code_core::config::Config; use code_core::config::ConfigOverrides; @@ -49,6 +53,7 @@ use tokio::time::Duration; use tokio::time::timeout; use tracing::error; use uuid::Uuid; +use chrono::DateTime; use crate::error_code::INTERNAL_ERROR_CODE; use crate::error_code::INVALID_REQUEST_ERROR_CODE; @@ -145,6 +150,7 @@ pub struct CodexMessageProcessor { active_login: Arc>>, // Queue of pending interrupt requests per conversation. We reply when TurnAborted arrives. pending_interrupts: Arc>>>, + resumed_conversation_aliases: Arc>>, #[allow(dead_code)] pending_fuzzy_searches: Arc>>>, } @@ -166,10 +172,23 @@ impl CodexMessageProcessor { conversation_listeners: HashMap::new(), active_login: Arc::new(Mutex::new(None)), pending_interrupts: Arc::new(Mutex::new(HashMap::new())), + resumed_conversation_aliases: Arc::new(Mutex::new(HashMap::new())), pending_fuzzy_searches: Arc::new(Mutex::new(HashMap::new())), } } + async fn resolve_conversation_id_alias( + &self, + conversation_id: ConversationId, + ) -> ConversationId { + self.resumed_conversation_aliases + .lock() + .await + .get(&conversation_id) + .copied() + .unwrap_or(conversation_id) + } + pub async fn process_request(&mut self, request: ClientRequest) { self.process_request_for_connection(ConnectionId(0), request) .await; @@ -589,6 +608,7 @@ impl CodexMessageProcessor { conversation_id, items, } = params; + let conversation_id = self.resolve_conversation_id_alias(conversation_id).await; let Ok(conversation) = self .conversation_manager .get_conversation(conversation_id) @@ -682,6 +702,7 @@ impl CodexMessageProcessor { params: InterruptConversationParams, ) { let InterruptConversationParams { conversation_id } = params; + let conversation_id = self.resolve_conversation_id_alias(conversation_id).await; let Ok(conversation) = self .conversation_manager .get_conversation(conversation_id) @@ -709,6 +730,7 @@ impl CodexMessageProcessor { params: AddConversationListenerParams, ) { let AddConversationListenerParams { conversation_id } = params; + let conversation_id = self.resolve_conversation_id_alias(conversation_id).await; let Ok(conversation) = self .conversation_manager .get_conversation(conversation_id) @@ -954,6 +976,133 @@ impl CodexMessageProcessor { } } + pub(crate) async fn thread_resume_v2( + &self, + request_id: RequestId, + params: ThreadResumeParams, + ) { + let unsupported_history = params.history.is_some(); + let thread_id = params.thread_id.clone(); + let catalog = SessionCatalog::new(self.config.code_home.clone()); + + let catalog_entry = match catalog.find_by_id(&thread_id).await { + Ok(entry) => entry, + Err(err) => { + let error = JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: format!("failed to resolve thread: {err}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } + }; + + let rollout_path = match catalog_entry + .as_ref() + .map(|entry| catalog.entry_rollout_path(entry)) + .or_else(|| params.path.clone()) + { + Some(path) => path, + None => { + let message = if unsupported_history { + "thread/resume.history is not supported by the Every Code app-server without a rollout path" + } else { + "thread not found" + }; + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: message.to_string(), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } + }; + + let overrides = NewConversationParams { + model: params.model.clone(), + profile: None, + cwd: params.cwd.clone(), + approval_policy: params + .approval_policy + .clone() + .map(|approval_policy| approval_policy.to_core()), + sandbox: params.sandbox.map(|sandbox| sandbox.to_core()), + config: params.config.clone(), + base_instructions: params.base_instructions.clone(), + include_plan_tool: None, + dynamic_tools: None, + include_apply_patch_tool: None, + }; + let config = match derive_config_from_params(overrides, self.code_linux_sandbox_exe.clone()) { + Ok(config) => config, + Err(err) => { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("error deriving config: {err}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } + }; + + match self + .conversation_manager + .resume_conversation_from_rollout( + config.clone(), + rollout_path.clone(), + Arc::clone(&self.auth_manager), + ) + .await + { + Ok(NewConversation { + conversation_id, + session_configured: _, + .. + }) => { + let thread = thread_resume_response_thread( + &thread_id, + catalog_entry.as_ref(), + &config, + rollout_path, + ); + self.outgoing + .send_response( + request_id, + ThreadResumeResponse { + thread, + model: config.model.clone(), + model_provider: config.model_provider_id.clone(), + cwd: config.cwd.clone(), + approval_policy: map_ask_for_approval_to_wire(config.approval_policy).into(), + sandbox: map_sandbox_policy_to_wire(config.sandbox_policy.clone()).into(), + reasoning_effort: Some(config.model_reasoning_effort.into()), + }, + ) + .await; + + if let Ok(requested_conversation_id) = ConversationId::from_string(&thread_id) + && requested_conversation_id != conversation_id + { + self.resumed_conversation_aliases + .lock() + .await + .insert(requested_conversation_id, conversation_id); + } + } + Err(err) => { + let error = JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: format!("error resuming thread: {err}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + } + } + } + async fn archive_conversation( &self, request_id: RequestId, @@ -963,6 +1112,7 @@ impl CodexMessageProcessor { conversation_id, rollout_path, } = params; + let conversation_id = self.resolve_conversation_id_alias(conversation_id).await; if self .conversation_manager @@ -1413,6 +1563,52 @@ impl CodexMessageProcessor { } } +fn parse_rfc3339_timestamp_seconds(value: &str) -> i64 { + DateTime::parse_from_rfc3339(value) + .map(|timestamp| timestamp.timestamp()) + .unwrap_or_default() +} + +fn thread_resume_response_thread( + thread_id: &str, + entry: Option<&code_core::SessionIndexEntry>, + config: &Config, + rollout_path: PathBuf, +) -> Thread { + let created_at = entry + .map(|item| parse_rfc3339_timestamp_seconds(&item.created_at)) + .unwrap_or_default(); + let updated_at = entry + .map(|item| parse_rfc3339_timestamp_seconds(&item.last_event_at)) + .unwrap_or(created_at); + + Thread { + id: thread_id.to_string(), + preview: entry + .and_then(|item| item.last_user_snippet.clone()) + .unwrap_or_default(), + model_provider: entry + .and_then(|item| item.model_provider.clone()) + .unwrap_or_else(|| config.model_provider_id.clone()), + created_at, + updated_at, + path: Some(rollout_path), + cwd: entry + .map(|item| item.cwd_real.clone()) + .unwrap_or_else(|| config.cwd.clone()), + cli_version: env!("CARGO_PKG_VERSION").to_string(), + source: entry + .map(|item| item.session_source.clone().into()) + .unwrap_or(code_app_server_protocol::SessionSource::AppServer), + git_info: entry.map(|item| code_app_server_protocol::GitInfo { + sha: None, + branch: item.git_branch.clone(), + origin_url: None, + }), + turns: Vec::new(), + } +} + impl CodexMessageProcessor { // Minimal compatibility layer: translate SendUserTurn into our current // flow by submitting only the user items. We intentionally do not attempt @@ -1429,6 +1625,7 @@ impl CodexMessageProcessor { items, .. } = params; + let conversation_id = self.resolve_conversation_id_alias(conversation_id).await; let Ok(conversation) = self .conversation_manager @@ -1923,6 +2120,7 @@ mod tests { use code_core::auth::CodexAuth; use code_core::auth::RefreshTokenError; use code_core::config::ConfigOverrides; + use code_core::SessionIndexEntry; use code_protocol::mcp_protocol::RemoveConversationListenerParams; use code_protocol::protocol::SessionSource; use mcp_types::RequestId; @@ -2179,6 +2377,51 @@ mod tests { vec!["selected".to_string()] ); } + + #[test] + fn thread_resume_response_thread_uses_catalog_metadata() { + let config = + Config::load_with_cli_overrides(Vec::new(), ConfigOverrides::default()) + .expect("load default config"); + let entry = SessionIndexEntry { + session_id: Uuid::new_v4(), + rollout_path: std::path::PathBuf::from("sessions/test.jsonl"), + snapshot_path: None, + created_at: "2026-04-03T10:00:00.000Z".to_string(), + last_event_at: "2026-04-03T10:05:00.000Z".to_string(), + cwd_real: std::path::PathBuf::from("/tmp/test-thread"), + cwd_display: "/tmp/test-thread".to_string(), + git_project_root: None, + git_branch: Some("main".to_string()), + model_provider: Some("openai".to_string()), + session_source: SessionSource::Mcp, + message_count: 3, + user_message_count: 1, + last_user_snippet: Some("resume me".to_string()), + nickname: None, + sync_origin_device: None, + sync_version: 0, + archived: false, + deleted: false, + }; + + let thread = thread_resume_response_thread( + &entry.session_id.to_string(), + Some(&entry), + &config, + std::path::PathBuf::from("/tmp/test.jsonl"), + ); + + assert_eq!(thread.id, entry.session_id.to_string()); + assert_eq!(thread.preview, "resume me"); + assert_eq!(thread.model_provider, "openai"); + assert_eq!(thread.cwd, std::path::PathBuf::from("/tmp/test-thread")); + assert_eq!(thread.path, Some(std::path::PathBuf::from("/tmp/test.jsonl"))); + assert_eq!(thread.source, code_app_server_protocol::SessionSource::AppServer); + assert_eq!(thread.git_info.and_then(|info| info.branch), Some("main".to_string())); + assert_eq!(thread.created_at, 1_775_210_400); + assert_eq!(thread.updated_at, 1_775_210_700); + } } impl IntoWireAuthMode for code_protocol::mcp_protocol::AuthMode { @@ -2227,6 +2470,35 @@ fn map_ask_for_approval_to_wire(a: core_protocol::AskForApproval) -> code_protoc } } +fn map_sandbox_policy_to_wire( + policy: core_protocol::SandboxPolicy, +) -> code_protocol::protocol::SandboxPolicy { + match policy { + core_protocol::SandboxPolicy::DangerFullAccess => { + code_protocol::protocol::SandboxPolicy::DangerFullAccess + } + core_protocol::SandboxPolicy::ReadOnly => code_protocol::protocol::SandboxPolicy::ReadOnly, + core_protocol::SandboxPolicy::WorkspaceWrite { + writable_roots, + network_access, + exclude_tmpdir_env_var, + exclude_slash_tmp, + allow_git_writes, + } => code_protocol::protocol::SandboxPolicy::WorkspaceWrite { + writable_roots: writable_roots + .into_iter() + .filter_map(|path| { + code_utils_absolute_path::AbsolutePathBuf::from_absolute_path(path).ok() + }) + .collect(), + network_access, + exclude_tmpdir_env_var, + exclude_slash_tmp, + allow_git_writes, + }, + } +} + fn map_reasoning_effort_to_wire( effort: code_core::config_types::ReasoningEffort, ) -> code_protocol::config_types::ReasoningEffort { diff --git a/code-rs/app-server/src/message_processor.rs b/code-rs/app-server/src/message_processor.rs index be58452049a..839975dbd24 100644 --- a/code-rs/app-server/src/message_processor.rs +++ b/code-rs/app-server/src/message_processor.rs @@ -30,6 +30,7 @@ use code_app_server_protocol::ExternalAgentConfigImportParams; use code_app_server_protocol::GetAccountParams; use code_app_server_protocol::LoginAccountParams; use code_app_server_protocol::MergeStrategy; +use code_app_server_protocol::ThreadResumeParams; use code_app_server_protocol::ToolsV2; use code_app_server_protocol::AskForApproval as V2AskForApproval; use code_app_server_protocol::WriteStatus; @@ -273,6 +274,7 @@ impl MessageProcessor { let is_v2_request = matches!( request.method.as_str(), "config/read" + | "thread/resume" | "configRequirements/read" | "config/value/write" | "config/batchWrite" @@ -295,6 +297,26 @@ impl MessageProcessor { } match request.method.as_str() { + "thread/resume" => { + let params_value = request.params.clone().unwrap_or_else(|| json!({})); + let params: ThreadResumeParams = match serde_json::from_value(params_value) { + Ok(params) => params, + Err(err) => { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("Invalid thread/resume params: {err}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return true; + } + }; + + self.code_message_processor + .thread_resume_v2(request_id, params) + .await; + true + } "config/read" => { let params_value = request.params.clone().unwrap_or_else(|| json!({})); let params: ConfigReadParams = match serde_json::from_value(params_value) { From a41cc7b2e6eb0a37b5459713a7869420fcb3e015 Mon Sep 17 00:00:00 2001 From: Chris Busillo Date: Fri, 3 Apr 2026 10:00:15 -0400 Subject: [PATCH 2/2] fix(app-server): honor thread/resume overrides --- .../app-server/src/code_message_processor.rs | 226 +++++++++++++++--- 1 file changed, 198 insertions(+), 28 deletions(-) diff --git a/code-rs/app-server/src/code_message_processor.rs b/code-rs/app-server/src/code_message_processor.rs index 45dd2c9be11..aa1ad3c5fc5 100644 --- a/code-rs/app-server/src/code_message_processor.rs +++ b/code-rs/app-server/src/code_message_processor.rs @@ -563,7 +563,7 @@ impl CodexMessageProcessor { } async fn process_new_conversation(&self, request_id: RequestId, params: NewConversationParams) { - let config = match derive_config_from_params(params, self.code_linux_sandbox_exe.clone()) { + let config = match derive_config_from_params(params, None, self.code_linux_sandbox_exe.clone()) { Ok(config) => config, Err(err) => { let error = JSONRPCErrorError { @@ -927,7 +927,7 @@ impl CodexMessageProcessor { async fn resume_conversation(&self, request_id: RequestId, params: ResumeConversationParams) { let overrides = params.overrides.unwrap_or_default(); - let config = match derive_config_from_params(overrides, self.code_linux_sandbox_exe.clone()) { + let config = match derive_config_from_params(overrides, None, self.code_linux_sandbox_exe.clone()) { Ok(config) => config, Err(err) => { let error = JSONRPCErrorError { @@ -982,27 +982,32 @@ impl CodexMessageProcessor { params: ThreadResumeParams, ) { let unsupported_history = params.history.is_some(); - let thread_id = params.thread_id.clone(); + let requested_thread_id = params.thread_id.clone(); let catalog = SessionCatalog::new(self.config.code_home.clone()); - let catalog_entry = match catalog.find_by_id(&thread_id).await { - Ok(entry) => entry, - Err(err) => { - let error = JSONRPCErrorError { - code: INTERNAL_ERROR_CODE, - message: format!("failed to resolve thread: {err}"), - data: None, - }; - self.outgoing.send_error(request_id, error).await; - return; + let requested_catalog_entry = if params.path.is_some() { + None + } else { + match catalog.find_by_id(&requested_thread_id).await { + Ok(entry) => entry, + Err(err) => { + let error = JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: format!("failed to resolve thread: {err}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } } }; - let rollout_path = match catalog_entry - .as_ref() - .map(|entry| catalog.entry_rollout_path(entry)) - .or_else(|| params.path.clone()) - { + let rollout_path = match thread_resume_rollout_path( + params.path.clone(), + requested_catalog_entry + .as_ref() + .map(|entry| catalog.entry_rollout_path(entry)), + ) { Some(path) => path, None => { let message = if unsupported_history { @@ -1019,6 +1024,11 @@ impl CodexMessageProcessor { return; } }; + let canonical_thread_id = thread_resume_canonical_thread_id( + &requested_thread_id, + &rollout_path, + requested_catalog_entry.as_ref(), + ); let overrides = NewConversationParams { model: params.model.clone(), @@ -1035,7 +1045,11 @@ impl CodexMessageProcessor { dynamic_tools: None, include_apply_patch_tool: None, }; - let config = match derive_config_from_params(overrides, self.code_linux_sandbox_exe.clone()) { + let config = match derive_config_from_params( + overrides, + params.model_provider.clone(), + self.code_linux_sandbox_exe.clone(), + ) { Ok(config) => config, Err(err) => { let error = JSONRPCErrorError { @@ -1062,8 +1076,20 @@ impl CodexMessageProcessor { session_configured: _, .. }) => { + let catalog_entry = match catalog.find_by_id(&canonical_thread_id).await { + Ok(entry) => entry, + Err(err) => { + let error = JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: format!("failed to resolve resumed thread: {err}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } + }; let thread = thread_resume_response_thread( - &thread_id, + &canonical_thread_id, catalog_entry.as_ref(), &config, rollout_path, @@ -1083,7 +1109,8 @@ impl CodexMessageProcessor { ) .await; - if let Ok(requested_conversation_id) = ConversationId::from_string(&thread_id) + if let Ok(requested_conversation_id) = + ConversationId::from_string(&requested_thread_id) && requested_conversation_id != conversation_id { self.resumed_conversation_aliases @@ -1569,6 +1596,24 @@ fn parse_rfc3339_timestamp_seconds(value: &str) -> i64 { .unwrap_or_default() } +fn thread_resume_rollout_path( + requested_path: Option, + catalog_rollout_path: Option, +) -> Option { + requested_path.or(catalog_rollout_path) +} + +fn thread_resume_canonical_thread_id( + requested_thread_id: &str, + rollout_path: &std::path::Path, + catalog_entry: Option<&code_core::SessionIndexEntry>, +) -> String { + conversation_id_from_rollout_path(rollout_path) + .map(|conversation_id| conversation_id.to_string()) + .or_else(|| catalog_entry.map(|entry| entry.session_id.to_string())) + .unwrap_or_else(|| requested_thread_id.to_string()) +} + fn thread_resume_response_thread( thread_id: &str, entry: Option<&code_core::SessionIndexEntry>, @@ -1583,13 +1628,13 @@ fn thread_resume_response_thread( .unwrap_or(created_at); Thread { - id: thread_id.to_string(), + id: entry + .map(|item| item.session_id.to_string()) + .unwrap_or_else(|| thread_id.to_string()), preview: entry .and_then(|item| item.last_user_snippet.clone()) .unwrap_or_default(), - model_provider: entry - .and_then(|item| item.model_provider.clone()) - .unwrap_or_else(|| config.model_provider_id.clone()), + model_provider: config.model_provider_id.clone(), created_at, updated_at, path: Some(rollout_path), @@ -1821,6 +1866,7 @@ async fn apply_bespoke_event_handling( fn derive_config_from_params( params: NewConversationParams, + model_provider: Option, code_linux_sandbox_exe: Option, ) -> std::io::Result { let NewConversationParams { @@ -1842,7 +1888,7 @@ fn derive_config_from_params( cwd: cwd.map(PathBuf::from), approval_policy: approval_policy.map(map_ask_for_approval_from_wire), sandbox_mode, - model_provider: None, + model_provider, code_linux_sandbox_exe, base_instructions, include_plan_tool, @@ -2422,6 +2468,128 @@ mod tests { assert_eq!(thread.created_at, 1_775_210_400); assert_eq!(thread.updated_at, 1_775_210_700); } + + #[test] + fn thread_resume_rollout_path_prefers_explicit_path() { + let explicit_path = std::path::PathBuf::from("/tmp/explicit.jsonl"); + let catalog_path = std::path::PathBuf::from("/tmp/catalog.jsonl"); + + let rollout_path = + thread_resume_rollout_path(Some(explicit_path.clone()), Some(catalog_path)); + + assert_eq!(rollout_path, Some(explicit_path)); + } + + #[test] + fn thread_resume_response_thread_uses_canonical_thread_id() { + let config = + Config::load_with_cli_overrides(Vec::new(), ConfigOverrides::default()) + .expect("load default config"); + let entry = SessionIndexEntry { + session_id: Uuid::new_v4(), + rollout_path: std::path::PathBuf::from("sessions/test.jsonl"), + snapshot_path: None, + created_at: "2026-04-03T10:00:00.000Z".to_string(), + last_event_at: "2026-04-03T10:05:00.000Z".to_string(), + cwd_real: std::path::PathBuf::from("/tmp/test-thread"), + cwd_display: "/tmp/test-thread".to_string(), + git_project_root: None, + git_branch: Some("main".to_string()), + model_provider: Some("openai".to_string()), + session_source: SessionSource::Mcp, + message_count: 3, + user_message_count: 1, + last_user_snippet: Some("resume me".to_string()), + nickname: None, + sync_origin_device: None, + sync_version: 0, + archived: false, + deleted: false, + }; + + let thread = thread_resume_response_thread( + "placeholder-thread-id", + Some(&entry), + &config, + std::path::PathBuf::from("/tmp/test.jsonl"), + ); + + assert_eq!(thread.id, entry.session_id.to_string()); + } + + #[test] + fn thread_resume_canonical_thread_id_prefers_rollout_path() { + let entry = SessionIndexEntry { + session_id: Uuid::parse_str("11111111-1111-4111-8111-111111111111") + .expect("valid uuid"), + rollout_path: std::path::PathBuf::from("sessions/wrong.jsonl"), + snapshot_path: None, + created_at: "2026-04-03T10:00:00.000Z".to_string(), + last_event_at: "2026-04-03T10:05:00.000Z".to_string(), + cwd_real: std::path::PathBuf::from("/tmp/test-thread"), + cwd_display: "/tmp/test-thread".to_string(), + git_project_root: None, + git_branch: Some("main".to_string()), + model_provider: Some("openai".to_string()), + session_source: SessionSource::Mcp, + message_count: 3, + user_message_count: 1, + last_user_snippet: Some("resume me".to_string()), + nickname: None, + sync_origin_device: None, + sync_version: 0, + archived: false, + deleted: false, + }; + + let canonical_thread_id = thread_resume_canonical_thread_id( + &entry.session_id.to_string(), + std::path::Path::new( + "/tmp/rollout-2026-04-03T09-10-00Z-22222222-2222-4222-8222-222222222222.jsonl", + ), + Some(&entry), + ); + + assert_eq!(canonical_thread_id, "22222222-2222-4222-8222-222222222222"); + } + + #[test] + fn derive_config_from_params_applies_model_provider_override() { + let params = NewConversationParams { + model: None, + profile: None, + cwd: None, + approval_policy: None, + sandbox: None, + config: None, + base_instructions: None, + include_plan_tool: None, + include_apply_patch_tool: None, + dynamic_tools: None, + }; + + let config = derive_config_from_params( + params, + Some("oss".to_string()), + None, + ) + .expect("derive config with model provider override"); + + assert_eq!(config.model_provider_id, "oss"); + } + + #[test] + fn conversation_id_from_rollout_path_parses_hyphenated_uuid_suffix() { + let conversation_id = conversation_id_from_rollout_path(std::path::Path::new( + "/tmp/rollout-2026-04-03T09-10-00Z-22222222-2222-4222-8222-222222222222.jsonl", + )) + .expect("conversation id should parse from rollout path"); + + assert_eq!( + conversation_id.to_string(), + "22222222-2222-4222-8222-222222222222" + ); + } } impl IntoWireAuthMode for code_protocol::mcp_protocol::AuthMode { @@ -2613,8 +2781,10 @@ fn rate_limit_snapshot_from_event( fn conversation_id_from_rollout_path(path: &std::path::Path) -> Option { let stem = path.file_stem()?.to_str()?; - let (_, id) = stem.rsplit_once('-')?; - ConversationId::from_string(id).ok() + + stem.match_indices('-') + .rev() + .find_map(|(index, _)| ConversationId::from_string(&stem[index + 1..]).ok()) } fn snippet_from_rollout_tail(tail: &[serde_json::Value]) -> Option {