From d35eae36fdeb6c0899c720a9e49eadfe4e751a31 Mon Sep 17 00:00:00 2001 From: Chris Busillo Date: Fri, 3 Apr 2026 10:54:55 -0400 Subject: [PATCH 1/2] feat(app-server): add review and model parity APIs Finish the thread/resume compatibility shim and add native model/list and review/start handlers to the local app-server compatibility layer. --- code-rs/Cargo.lock | 1 + code-rs/app-server/Cargo.toml | 1 + .../app-server/src/code_message_processor.rs | 885 +++++++++++++++++- code-rs/app-server/src/message_processor.rs | 66 ++ 4 files changed, 948 insertions(+), 5 deletions(-) diff --git a/code-rs/Cargo.lock b/code-rs/Cargo.lock index 61de029d383..20637765f69 100644 --- a/code-rs/Cargo.lock +++ b/code-rs/Cargo.lock @@ -897,6 +897,7 @@ version = "0.0.0" dependencies = [ "anyhow", "assert_cmd", + "chrono", "clap", "code-app-server-protocol", "code-arg0", diff --git a/code-rs/app-server/Cargo.toml b/code-rs/app-server/Cargo.toml index 13e6e272f09..56f0a6c5c67 100644 --- a/code-rs/app-server/Cargo.toml +++ b/code-rs/app-server/Cargo.toml @@ -26,6 +26,7 @@ code-app-server-protocol = { workspace = true } code-protocol = { workspace = true } code-utils-absolute-path = { workspace = true } code-utils-json-to-toml = { workspace = true } +chrono = { workspace = true } futures = { workspace = true } # We should only be using mcp-types for JSON-RPC types: it would be nice to # split this out into a separate crate at some point. diff --git a/code-rs/app-server/src/code_message_processor.rs b/code-rs/app-server/src/code_message_processor.rs index 8737ef21527..c3cd4bce9ac 100644 --- a/code-rs/app-server/src/code_message_processor.rs +++ b/code-rs/app-server/src/code_message_processor.rs @@ -12,15 +12,34 @@ use code_app_server_protocol::GetAccountResponse; use code_app_server_protocol::LoginAccountParams; use code_app_server_protocol::LoginAccountResponse; use code_app_server_protocol::LogoutAccountResponse; +use code_app_server_protocol::Model as V2Model; +use code_app_server_protocol::ModelListParams; +use code_app_server_protocol::ModelListResponse; +use code_app_server_protocol::ModelUpgradeInfo; +use code_app_server_protocol::ReasoningEffortOption; +use code_app_server_protocol::ReviewStartParams; +use code_app_server_protocol::ReviewStartResponse; +use code_app_server_protocol::ReviewTarget as V2ReviewTarget; +use code_app_server_protocol::Thread; +use code_app_server_protocol::ThreadItem; +use code_app_server_protocol::ThreadResumeParams; +use code_app_server_protocol::ThreadResumeResponse; +use code_app_server_protocol::Turn; +use code_app_server_protocol::TurnStatus; +use code_app_server_protocol::UserInput as V2UserInput; use code_app_server_protocol::ToolRequestUserInputOption; use code_app_server_protocol::ToolRequestUserInputParams; use code_app_server_protocol::ToolRequestUserInputQuestion; use code_app_server_protocol::ToolRequestUserInputResponse; +use code_common::model_presets::all_model_presets; +use code_common::model_presets::model_preset_available_for_auth; +use code_app_server_protocol::AuthMode; use code_core::AuthManager; use code_core::CodexConversation; use code_core::ConversationManager; use code_core::NewConversation; use code_core::RolloutRecorder; +use code_core::SessionCatalog; use code_core::Cursor; use code_core::config::Config; use code_core::config::ConfigOverrides; @@ -37,6 +56,7 @@ use code_core::protocol::ExecApprovalRequestEvent; use code_protocol::mcp_protocol::FuzzyFileSearchParams; use code_protocol::mcp_protocol::FuzzyFileSearchResponse; use code_protocol::protocol::ReviewDecision; +use code_protocol::protocol::ReviewTarget as CoreReviewTarget; use mcp_types::JSONRPCErrorError; use mcp_types::RequestId; use code_login::CLIENT_ID; @@ -49,6 +69,7 @@ use tokio::time::Duration; use tokio::time::timeout; use tracing::error; use uuid::Uuid; +use chrono::DateTime; use crate::error_code::INTERNAL_ERROR_CODE; use crate::error_code::INVALID_REQUEST_ERROR_CODE; @@ -145,6 +166,7 @@ pub struct CodexMessageProcessor { active_login: Arc>>, // Queue of pending interrupt requests per conversation. We reply when TurnAborted arrives. pending_interrupts: Arc>>>, + resumed_conversation_aliases: Arc>>, #[allow(dead_code)] pending_fuzzy_searches: Arc>>>, } @@ -166,10 +188,23 @@ impl CodexMessageProcessor { conversation_listeners: HashMap::new(), active_login: Arc::new(Mutex::new(None)), pending_interrupts: Arc::new(Mutex::new(HashMap::new())), + resumed_conversation_aliases: Arc::new(Mutex::new(HashMap::new())), pending_fuzzy_searches: Arc::new(Mutex::new(HashMap::new())), } } + async fn resolve_conversation_id_alias( + &self, + conversation_id: ConversationId, + ) -> ConversationId { + self.resumed_conversation_aliases + .lock() + .await + .get(&conversation_id) + .copied() + .unwrap_or(conversation_id) + } + pub async fn process_request(&mut self, request: ClientRequest) { self.process_request_for_connection(ConnectionId(0), request) .await; @@ -544,7 +579,11 @@ impl CodexMessageProcessor { } async fn process_new_conversation(&self, request_id: RequestId, params: NewConversationParams) { - let config = match derive_config_from_params(params, self.code_linux_sandbox_exe.clone()) { + let config = match derive_config_from_params( + params, + None, + self.code_linux_sandbox_exe.clone(), + ) { Ok(config) => config, Err(err) => { let error = JSONRPCErrorError { @@ -589,6 +628,7 @@ impl CodexMessageProcessor { conversation_id, items, } = params; + let conversation_id = self.resolve_conversation_id_alias(conversation_id).await; let Ok(conversation) = self .conversation_manager .get_conversation(conversation_id) @@ -682,6 +722,7 @@ impl CodexMessageProcessor { params: InterruptConversationParams, ) { let InterruptConversationParams { conversation_id } = params; + let conversation_id = self.resolve_conversation_id_alias(conversation_id).await; let Ok(conversation) = self .conversation_manager .get_conversation(conversation_id) @@ -709,6 +750,7 @@ impl CodexMessageProcessor { params: AddConversationListenerParams, ) { let AddConversationListenerParams { conversation_id } = params; + let conversation_id = self.resolve_conversation_id_alias(conversation_id).await; let Ok(conversation) = self .conversation_manager .get_conversation(conversation_id) @@ -905,7 +947,11 @@ impl CodexMessageProcessor { async fn resume_conversation(&self, request_id: RequestId, params: ResumeConversationParams) { let overrides = params.overrides.unwrap_or_default(); - let config = match derive_config_from_params(overrides, self.code_linux_sandbox_exe.clone()) { + let config = match derive_config_from_params( + overrides, + None, + self.code_linux_sandbox_exe.clone(), + ) { Ok(config) => config, Err(err) => { let error = JSONRPCErrorError { @@ -954,6 +1000,389 @@ impl CodexMessageProcessor { } } + pub(crate) async fn thread_resume_v2( + &self, + request_id: RequestId, + params: ThreadResumeParams, + ) { + let unsupported_history = params.history.is_some(); + let thread_id = params.thread_id.clone(); + let catalog = SessionCatalog::new(self.config.code_home.clone()); + + let catalog_entry = match catalog.find_by_id(&thread_id).await { + Ok(entry) => entry, + Err(err) => { + let error = JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: format!("failed to resolve thread: {err}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } + }; + + let rollout_path = match thread_resume_rollout_path( + params.path.clone(), + catalog_entry + .as_ref() + .map(|entry| catalog.entry_rollout_path(entry)), + ) { + Some(path) => path, + None => { + let message = if unsupported_history { + "thread/resume.history is not supported by the Every Code app-server without a rollout path" + } else { + "thread not found" + }; + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: message.to_string(), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } + }; + + let overrides = NewConversationParams { + model: params.model.clone(), + profile: None, + cwd: params.cwd.clone(), + approval_policy: params + .approval_policy + .clone() + .map(|approval_policy| approval_policy.to_core()), + sandbox: params.sandbox.map(|sandbox| sandbox.to_core()), + config: params.config.clone(), + base_instructions: params.base_instructions.clone(), + include_plan_tool: None, + dynamic_tools: None, + include_apply_patch_tool: None, + }; + let config = match derive_config_from_params( + overrides, + params.model_provider.clone(), + self.code_linux_sandbox_exe.clone(), + ) { + Ok(config) => config, + Err(err) => { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("error deriving config: {err}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } + }; + + match self + .conversation_manager + .resume_conversation_from_rollout( + config.clone(), + rollout_path.clone(), + Arc::clone(&self.auth_manager), + ) + .await + { + Ok(NewConversation { + conversation_id, + session_configured: _, + .. + }) => { + let canonical_thread_id = thread_resume_canonical_thread_id( + &thread_id, + &rollout_path, + catalog_entry.as_ref(), + ); + let thread = thread_resume_response_thread( + &canonical_thread_id, + catalog_entry.as_ref(), + &config, + rollout_path, + ); + self.outgoing + .send_response( + request_id, + ThreadResumeResponse { + thread, + model: config.model.clone(), + model_provider: config.model_provider_id.clone(), + cwd: config.cwd.clone(), + approval_policy: map_ask_for_approval_to_wire(config.approval_policy).into(), + sandbox: map_sandbox_policy_to_wire(config.sandbox_policy.clone()).into(), + reasoning_effort: Some(config.model_reasoning_effort.into()), + }, + ) + .await; + + if let Ok(requested_conversation_id) = ConversationId::from_string(&thread_id) + && requested_conversation_id != conversation_id + { + self.resumed_conversation_aliases + .lock() + .await + .insert(requested_conversation_id, conversation_id); + } + } + Err(err) => { + let error = JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: format!("error resuming thread: {err}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + } + } + } + + pub(crate) async fn model_list_v2(&self, request_id: RequestId, params: ModelListParams) { + let ModelListParams { + limit, + cursor, + include_hidden, + } = params; + let (auth_mode, supports_pro_only_models) = + model_picker_auth_state(&self.config, &self.auth_manager); + let mut models: Vec = all_model_presets() + .iter() + .filter(|preset| { + model_preset_available_for_auth(preset, auth_mode, supports_pro_only_models) + }) + .filter(|preset| include_hidden.unwrap_or(false) || preset.show_in_picker) + .cloned() + .map(model_preset_to_v2_model) + .collect(); + + mark_default_model(&mut models); + + let total = models.len(); + if total == 0 { + self.outgoing + .send_response( + request_id, + ModelListResponse { + data: Vec::new(), + next_cursor: None, + }, + ) + .await; + return; + } + + let effective_limit = limit.unwrap_or(total as u32).max(1) as usize; + let effective_limit = effective_limit.min(total); + let start = match cursor { + Some(cursor) => match cursor.parse::() { + Ok(index) => index, + Err(_) => { + self.outgoing + .send_error( + request_id, + JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("invalid cursor: {cursor}"), + data: None, + }, + ) + .await; + return; + } + }, + None => 0, + }; + + if start > total { + self.outgoing + .send_error( + request_id, + JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("cursor {start} exceeds total models {total}"), + data: None, + }, + ) + .await; + return; + } + + let end = start.saturating_add(effective_limit).min(total); + let next_cursor = if end < total { + Some(end.to_string()) + } else { + None + }; + + self.outgoing + .send_response( + request_id, + ModelListResponse { + data: models[start..end].to_vec(), + next_cursor, + }, + ) + .await; + } + + pub(crate) async fn review_start_v2(&self, request_id: RequestId, params: ReviewStartParams) { + let ReviewStartParams { + thread_id, + target, + delivery, + } = params; + let resolved_thread_id = match ConversationId::from_string(&thread_id) { + Ok(conversation_id) => self.resolve_conversation_id_alias(conversation_id).await, + Err(_) => { + self.outgoing + .send_error( + request_id, + JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("invalid thread id: {thread_id}"), + data: None, + }, + ) + .await; + return; + } + }; + + let (review_request, display_text) = match review_request_from_target(target) { + Ok(value) => value, + Err(err) => { + self.outgoing.send_error(request_id, err).await; + return; + } + }; + + let delivery = delivery.unwrap_or(code_app_server_protocol::ReviewDelivery::Inline); + match delivery { + code_app_server_protocol::ReviewDelivery::Inline => { + let conversation = match self + .conversation_manager + .get_conversation(resolved_thread_id) + .await + { + Ok(conversation) => conversation, + Err(_) => { + self.outgoing + .send_error( + request_id, + JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("thread not found: {thread_id}"), + data: None, + }, + ) + .await; + return; + } + }; + + let turn_id = match conversation.submit(Op::Review { review_request }).await { + Ok(turn_id) => turn_id, + Err(err) => { + self.outgoing + .send_error( + request_id, + JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: format!("failed to start review: {err}"), + data: None, + }, + ) + .await; + return; + } + }; + + self.outgoing + .send_response( + request_id, + ReviewStartResponse { + turn: build_review_turn(turn_id, &display_text), + review_thread_id: thread_id, + }, + ) + .await; + } + code_app_server_protocol::ReviewDelivery::Detached => { + let mut config = (*self.config).clone(); + let catalog = SessionCatalog::new(self.config.code_home.clone()); + if let Ok(Some(entry)) = catalog.find_by_id(&thread_id).await { + config.cwd = entry.cwd_real; + } + + let NewConversation { + conversation_id, + .. + } = match self.conversation_manager.new_conversation(config).await { + Ok(conversation) => conversation, + Err(err) => { + self.outgoing + .send_error( + request_id, + JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: format!("failed to create detached review thread: {err}"), + data: None, + }, + ) + .await; + return; + } + }; + let conversation = match self + .conversation_manager + .get_conversation(conversation_id) + .await + { + Ok(conversation) => conversation, + Err(err) => { + self.outgoing + .send_error( + request_id, + JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: format!("failed to load detached review thread: {err}"), + data: None, + }, + ) + .await; + return; + } + }; + let turn_id = match conversation.submit(Op::Review { review_request }).await { + Ok(turn_id) => turn_id, + Err(err) => { + self.outgoing + .send_error( + request_id, + JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: format!("failed to start detached review: {err}"), + data: None, + }, + ) + .await; + return; + } + }; + + self.outgoing + .send_response( + request_id, + ReviewStartResponse { + turn: build_review_turn(turn_id, &display_text), + review_thread_id: conversation_id.to_string(), + }, + ) + .await; + } + } + } + async fn archive_conversation( &self, request_id: RequestId, @@ -963,6 +1392,7 @@ impl CodexMessageProcessor { conversation_id, rollout_path, } = params; + let conversation_id = self.resolve_conversation_id_alias(conversation_id).await; if self .conversation_manager @@ -1413,6 +1843,227 @@ impl CodexMessageProcessor { } } +fn parse_rfc3339_timestamp_seconds(value: &str) -> i64 { + DateTime::parse_from_rfc3339(value) + .map(|timestamp| timestamp.timestamp()) + .unwrap_or_default() +} + +fn thread_resume_rollout_path( + explicit_path: Option, + catalog_path: Option, +) -> Option { + explicit_path.or(catalog_path) +} + +fn thread_resume_canonical_thread_id( + requested_thread_id: &str, + rollout_path: &std::path::Path, + entry: Option<&code_core::SessionIndexEntry>, +) -> String { + conversation_id_from_rollout_path(rollout_path) + .map(|conversation_id| conversation_id.to_string()) + .or_else(|| entry.map(|item| item.session_id.to_string())) + .unwrap_or_else(|| requested_thread_id.to_string()) +} + +fn thread_resume_response_thread( + thread_id: &str, + entry: Option<&code_core::SessionIndexEntry>, + config: &Config, + rollout_path: PathBuf, +) -> Thread { + let created_at = entry + .map(|item| parse_rfc3339_timestamp_seconds(&item.created_at)) + .unwrap_or_default(); + let updated_at = entry + .map(|item| parse_rfc3339_timestamp_seconds(&item.last_event_at)) + .unwrap_or(created_at); + + Thread { + id: thread_id.to_string(), + preview: entry + .and_then(|item| item.last_user_snippet.clone()) + .unwrap_or_default(), + model_provider: entry + .and_then(|item| item.model_provider.clone()) + .unwrap_or_else(|| config.model_provider_id.clone()), + created_at, + updated_at, + path: Some(rollout_path), + cwd: entry + .map(|item| item.cwd_real.clone()) + .unwrap_or_else(|| config.cwd.clone()), + cli_version: env!("CARGO_PKG_VERSION").to_string(), + source: entry + .map(|item| item.session_source.clone().into()) + .unwrap_or(code_app_server_protocol::SessionSource::AppServer), + git_info: entry.map(|item| code_app_server_protocol::GitInfo { + sha: None, + branch: item.git_branch.clone(), + origin_url: None, + }), + turns: Vec::new(), + } +} + +fn model_picker_auth_state( + config: &Config, + auth_manager: &AuthManager, +) -> (Option, bool) { + let preferred_auth_mode = if config.using_chatgpt_auth { + AuthMode::Chatgpt + } else { + AuthMode::ApiKey + }; + let auth_mode = auth_manager + .auth() + .map(|auth| auth.mode) + .or(Some(preferred_auth_mode)); + let supports_pro_only_models = auth_manager.supports_pro_only_models(); + (auth_mode, supports_pro_only_models) +} + +fn model_preset_to_v2_model(preset: code_common::model_presets::ModelPreset) -> V2Model { + V2Model { + id: preset.id.clone(), + model: preset.model, + upgrade: preset.upgrade.as_ref().map(|upgrade| upgrade.id.clone()), + upgrade_info: preset.upgrade.map(|upgrade| ModelUpgradeInfo { + model: upgrade.id, + upgrade_copy: None, + model_link: None, + migration_markdown: None, + }), + availability_nux: None, + display_name: preset.display_name, + description: preset.description, + hidden: !preset.show_in_picker, + supported_reasoning_efforts: preset + .supported_reasoning_efforts + .into_iter() + .map(|preset| ReasoningEffortOption { + reasoning_effort: preset.effort.into(), + description: preset.description, + }) + .collect(), + default_reasoning_effort: preset.default_reasoning_effort.into(), + input_modalities: code_protocol::openai_models::default_input_modalities(), + supports_personality: false, + is_default: preset.is_default, + } +} + +fn mark_default_model(models: &mut [V2Model]) { + for model in models.iter_mut() { + model.is_default = false; + } + if let Some(model) = models.iter_mut().find(|model| !model.hidden) { + model.is_default = true; + } else if let Some(model) = models.first_mut() { + model.is_default = true; + } +} + +fn review_request_from_target( + target: V2ReviewTarget, +) -> Result<(core_protocol::ReviewRequest, String), JSONRPCErrorError> { + fn invalid_request(message: String) -> JSONRPCErrorError { + JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message, + data: None, + } + } + + let (target, prompt, hint) = match target { + V2ReviewTarget::UncommittedChanges => ( + CoreReviewTarget::UncommittedChanges, + "Review the current workspace changes and highlight bugs, regressions, risky patterns, and missing tests before merge.".to_string(), + "current workspace changes".to_string(), + ), + V2ReviewTarget::BaseBranch { branch } => { + let branch = branch.trim().to_string(); + if branch.is_empty() { + return Err(invalid_request("branch must not be empty".to_string())); + } + ( + CoreReviewTarget::BaseBranch { + branch: branch.clone(), + }, + format!( + "Review the changes between the current branch and base branch {branch} and highlight bugs, regressions, risky patterns, and missing tests before merge." + ), + format!("base branch {branch}"), + ) + } + V2ReviewTarget::Commit { sha, title } => { + let sha = sha.trim().to_string(); + if sha.is_empty() { + return Err(invalid_request("sha must not be empty".to_string())); + } + let title = title.map(|value| value.trim().to_string()).filter(|value| !value.is_empty()); + let short_sha: String = sha.chars().take(12).collect(); + let prompt = match title.as_deref() { + Some(title) => format!( + "Review the changes introduced by commit {sha} ({title}) and highlight bugs, regressions, risky patterns, and missing tests before merge." + ), + None => format!( + "Review the changes introduced by commit {sha} and highlight bugs, regressions, risky patterns, and missing tests before merge." + ), + }; + ( + CoreReviewTarget::Commit { sha, title }, + prompt, + format!("commit {short_sha}"), + ) + } + V2ReviewTarget::Custom { instructions } => { + let instructions = instructions.trim().to_string(); + if instructions.is_empty() { + return Err(invalid_request("instructions must not be empty".to_string())); + } + ( + CoreReviewTarget::Custom { + instructions: instructions.clone(), + }, + instructions.clone(), + instructions, + ) + } + }; + + Ok(( + core_protocol::ReviewRequest { + target, + user_facing_hint: Some(hint.clone()), + prompt, + }, + hint, + )) +} + +fn build_review_turn(turn_id: String, display_text: &str) -> Turn { + let items = if display_text.is_empty() { + Vec::new() + } else { + vec![ThreadItem::UserMessage { + id: turn_id.clone(), + content: vec![V2UserInput::Text { + text: display_text.to_string(), + text_elements: Vec::new(), + }], + }] + }; + + Turn { + id: turn_id, + items, + error: None, + status: TurnStatus::InProgress, + } +} + impl CodexMessageProcessor { // Minimal compatibility layer: translate SendUserTurn into our current // flow by submitting only the user items. We intentionally do not attempt @@ -1429,6 +2080,7 @@ impl CodexMessageProcessor { items, .. } = params; + let conversation_id = self.resolve_conversation_id_alias(conversation_id).await; let Ok(conversation) = self .conversation_manager @@ -1624,6 +2276,7 @@ async fn apply_bespoke_event_handling( fn derive_config_from_params( params: NewConversationParams, + model_provider: Option, code_linux_sandbox_exe: Option, ) -> std::io::Result { let NewConversationParams { @@ -1645,7 +2298,7 @@ fn derive_config_from_params( cwd: cwd.map(PathBuf::from), approval_policy: approval_policy.map(map_ask_for_approval_from_wire), sandbox_mode, - model_provider: None, + model_provider, code_linux_sandbox_exe, base_instructions, include_plan_tool, @@ -1923,6 +2576,7 @@ mod tests { use code_core::auth::CodexAuth; use code_core::auth::RefreshTokenError; use code_core::config::ConfigOverrides; + use code_core::SessionIndexEntry; use code_protocol::mcp_protocol::RemoveConversationListenerParams; use code_protocol::protocol::SessionSource; use mcp_types::RequestId; @@ -2179,6 +2833,196 @@ mod tests { vec!["selected".to_string()] ); } + + #[test] + fn thread_resume_response_thread_uses_catalog_metadata() { + let config = + Config::load_with_cli_overrides(Vec::new(), ConfigOverrides::default()) + .expect("load default config"); + let entry = SessionIndexEntry { + session_id: Uuid::new_v4(), + rollout_path: std::path::PathBuf::from("sessions/test.jsonl"), + snapshot_path: None, + created_at: "2026-04-03T10:00:00.000Z".to_string(), + last_event_at: "2026-04-03T10:05:00.000Z".to_string(), + cwd_real: std::path::PathBuf::from("/tmp/test-thread"), + cwd_display: "/tmp/test-thread".to_string(), + git_project_root: None, + git_branch: Some("main".to_string()), + model_provider: Some("openai".to_string()), + session_source: SessionSource::Mcp, + message_count: 3, + user_message_count: 1, + last_user_snippet: Some("resume me".to_string()), + nickname: None, + sync_origin_device: None, + sync_version: 0, + archived: false, + deleted: false, + }; + + let thread = thread_resume_response_thread( + &entry.session_id.to_string(), + Some(&entry), + &config, + std::path::PathBuf::from("/tmp/test.jsonl"), + ); + + assert_eq!(thread.id, entry.session_id.to_string()); + assert_eq!(thread.preview, "resume me"); + assert_eq!(thread.model_provider, "openai"); + assert_eq!(thread.cwd, std::path::PathBuf::from("/tmp/test-thread")); + assert_eq!(thread.path, Some(std::path::PathBuf::from("/tmp/test.jsonl"))); + assert_eq!(thread.source, code_app_server_protocol::SessionSource::AppServer); + assert_eq!(thread.git_info.and_then(|info| info.branch), Some("main".to_string())); + assert_eq!(thread.created_at, 1_775_210_400); + assert_eq!(thread.updated_at, 1_775_210_700); + } + + #[test] + fn thread_resume_rollout_path_prefers_explicit_path() { + let explicit_path = std::path::PathBuf::from("/tmp/explicit.jsonl"); + let catalog_path = std::path::PathBuf::from("/tmp/catalog.jsonl"); + + let rollout_path = + thread_resume_rollout_path(Some(explicit_path.clone()), Some(catalog_path)); + + assert_eq!(rollout_path, Some(explicit_path)); + } + + #[test] + fn thread_resume_canonical_thread_id_prefers_rollout_path() { + let entry = SessionIndexEntry { + session_id: Uuid::parse_str("11111111-1111-4111-8111-111111111111") + .expect("valid uuid"), + rollout_path: std::path::PathBuf::from("sessions/wrong.jsonl"), + snapshot_path: None, + created_at: "2026-04-03T10:00:00.000Z".to_string(), + last_event_at: "2026-04-03T10:05:00.000Z".to_string(), + cwd_real: std::path::PathBuf::from("/tmp/test-thread"), + cwd_display: "/tmp/test-thread".to_string(), + git_project_root: None, + git_branch: Some("main".to_string()), + model_provider: Some("openai".to_string()), + session_source: SessionSource::Mcp, + message_count: 3, + user_message_count: 1, + last_user_snippet: Some("resume me".to_string()), + nickname: None, + sync_origin_device: None, + sync_version: 0, + archived: false, + deleted: false, + }; + + let canonical_thread_id = thread_resume_canonical_thread_id( + &entry.session_id.to_string(), + std::path::Path::new( + "/tmp/rollout-2026-04-03T09-10-00Z-22222222-2222-4222-8222-222222222222.jsonl", + ), + Some(&entry), + ); + + assert_eq!(canonical_thread_id, "22222222-2222-4222-8222-222222222222"); + } + + #[test] + fn derive_config_from_params_applies_model_provider_override() { + let params = NewConversationParams { + model: None, + profile: None, + cwd: None, + approval_policy: None, + sandbox: None, + config: None, + base_instructions: None, + include_plan_tool: None, + include_apply_patch_tool: None, + dynamic_tools: None, + }; + + let config = derive_config_from_params(params, Some("oss".to_string()), None) + .expect("derive config with model provider override"); + + assert_eq!(config.model_provider_id, "oss"); + } + + #[test] + fn mark_default_model_prefers_visible_models() { + let mut models = vec![ + V2Model { + id: "hidden".to_string(), + model: "hidden".to_string(), + upgrade: None, + upgrade_info: None, + availability_nux: None, + display_name: "Hidden".to_string(), + description: String::new(), + hidden: true, + supported_reasoning_efforts: Vec::new(), + default_reasoning_effort: code_protocol::config_types::ReasoningEffort::Minimal, + input_modalities: code_protocol::openai_models::default_input_modalities(), + supports_personality: false, + is_default: true, + }, + V2Model { + id: "visible".to_string(), + model: "visible".to_string(), + upgrade: None, + upgrade_info: None, + availability_nux: None, + display_name: "Visible".to_string(), + description: String::new(), + hidden: false, + supported_reasoning_efforts: Vec::new(), + default_reasoning_effort: code_protocol::config_types::ReasoningEffort::Minimal, + input_modalities: code_protocol::openai_models::default_input_modalities(), + supports_personality: false, + is_default: false, + }, + ]; + + mark_default_model(&mut models); + + assert!(!models[0].is_default); + assert!(models[1].is_default); + } + + #[test] + fn review_request_from_target_builds_workspace_prompt() { + let (request, display_text) = review_request_from_target(V2ReviewTarget::UncommittedChanges) + .expect("workspace review target should be accepted"); + + assert_eq!(display_text, "current workspace changes"); + assert_eq!( + request.user_facing_hint.as_deref(), + Some("current workspace changes") + ); + assert!(request.prompt.contains("current workspace changes")); + } + + #[test] + fn review_request_from_target_rejects_empty_custom_instructions() { + let error = review_request_from_target(V2ReviewTarget::Custom { + instructions: " ".to_string(), + }) + .expect_err("empty instructions should be rejected"); + + assert_eq!(error.message, "instructions must not be empty"); + } + + #[test] + fn conversation_id_from_rollout_path_parses_hyphenated_uuid_suffix() { + let conversation_id = conversation_id_from_rollout_path(std::path::Path::new( + "/tmp/rollout-2026-04-03T09-10-00Z-22222222-2222-4222-8222-222222222222.jsonl", + )) + .expect("conversation id should parse from rollout path"); + + assert_eq!( + conversation_id.to_string(), + "22222222-2222-4222-8222-222222222222" + ); + } } impl IntoWireAuthMode for code_protocol::mcp_protocol::AuthMode { @@ -2227,6 +3071,35 @@ fn map_ask_for_approval_to_wire(a: core_protocol::AskForApproval) -> code_protoc } } +fn map_sandbox_policy_to_wire( + policy: core_protocol::SandboxPolicy, +) -> code_protocol::protocol::SandboxPolicy { + match policy { + core_protocol::SandboxPolicy::DangerFullAccess => { + code_protocol::protocol::SandboxPolicy::DangerFullAccess + } + core_protocol::SandboxPolicy::ReadOnly => code_protocol::protocol::SandboxPolicy::ReadOnly, + core_protocol::SandboxPolicy::WorkspaceWrite { + writable_roots, + network_access, + exclude_tmpdir_env_var, + exclude_slash_tmp, + allow_git_writes, + } => code_protocol::protocol::SandboxPolicy::WorkspaceWrite { + writable_roots: writable_roots + .into_iter() + .filter_map(|path| { + code_utils_absolute_path::AbsolutePathBuf::from_absolute_path(path).ok() + }) + .collect(), + network_access, + exclude_tmpdir_env_var, + exclude_slash_tmp, + allow_git_writes, + }, + } +} + fn map_reasoning_effort_to_wire( effort: code_core::config_types::ReasoningEffort, ) -> code_protocol::config_types::ReasoningEffort { @@ -2341,8 +3214,10 @@ fn rate_limit_snapshot_from_event( fn conversation_id_from_rollout_path(path: &std::path::Path) -> Option { let stem = path.file_stem()?.to_str()?; - let (_, id) = stem.rsplit_once('-')?; - ConversationId::from_string(id).ok() + + stem.match_indices('-') + .rev() + .find_map(|(index, _)| ConversationId::from_string(&stem[index + 1..]).ok()) } fn snippet_from_rollout_tail(tail: &[serde_json::Value]) -> Option { diff --git a/code-rs/app-server/src/message_processor.rs b/code-rs/app-server/src/message_processor.rs index be58452049a..c7643287b98 100644 --- a/code-rs/app-server/src/message_processor.rs +++ b/code-rs/app-server/src/message_processor.rs @@ -30,6 +30,9 @@ use code_app_server_protocol::ExternalAgentConfigImportParams; use code_app_server_protocol::GetAccountParams; use code_app_server_protocol::LoginAccountParams; use code_app_server_protocol::MergeStrategy; +use code_app_server_protocol::ModelListParams; +use code_app_server_protocol::ReviewStartParams; +use code_app_server_protocol::ThreadResumeParams; use code_app_server_protocol::ToolsV2; use code_app_server_protocol::AskForApproval as V2AskForApproval; use code_app_server_protocol::WriteStatus; @@ -273,6 +276,9 @@ impl MessageProcessor { let is_v2_request = matches!( request.method.as_str(), "config/read" + | "review/start" + | "model/list" + | "thread/resume" | "configRequirements/read" | "config/value/write" | "config/batchWrite" @@ -295,6 +301,66 @@ impl MessageProcessor { } match request.method.as_str() { + "review/start" => { + let params_value = request.params.clone().unwrap_or_else(|| json!({})); + let params: ReviewStartParams = match serde_json::from_value(params_value) { + Ok(params) => params, + Err(err) => { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("Invalid review/start params: {err}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return true; + } + }; + + self.code_message_processor + .review_start_v2(request_id, params) + .await; + true + } + "model/list" => { + let params_value = request.params.clone().unwrap_or_else(|| json!({})); + let params: ModelListParams = match serde_json::from_value(params_value) { + Ok(params) => params, + Err(err) => { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("Invalid model/list params: {err}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return true; + } + }; + + self.code_message_processor + .model_list_v2(request_id, params) + .await; + true + } + "thread/resume" => { + let params_value = request.params.clone().unwrap_or_else(|| json!({})); + let params: ThreadResumeParams = match serde_json::from_value(params_value) { + Ok(params) => params, + Err(err) => { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("Invalid thread/resume params: {err}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return true; + } + }; + + self.code_message_processor + .thread_resume_v2(request_id, params) + .await; + true + } "config/read" => { let params_value = request.params.clone().unwrap_or_else(|| json!({})); let params: ConfigReadParams = match serde_json::from_value(params_value) { From 4442c0391e105e7fc262d96e65c3048a29b0f647 Mon Sep 17 00:00:00 2001 From: Chris Busillo Date: Fri, 3 Apr 2026 12:26:26 -0400 Subject: [PATCH 2/2] codex: address PR review feedback (#570) --- .../app-server/src/code_message_processor.rs | 157 ++++++++++++++++-- 1 file changed, 140 insertions(+), 17 deletions(-) diff --git a/code-rs/app-server/src/code_message_processor.rs b/code-rs/app-server/src/code_message_processor.rs index c3cd4bce9ac..e38fb9fb7de 100644 --- a/code-rs/app-server/src/code_message_processor.rs +++ b/code-rs/app-server/src/code_message_processor.rs @@ -1005,7 +1005,17 @@ impl CodexMessageProcessor { request_id: RequestId, params: ThreadResumeParams, ) { - let unsupported_history = params.history.is_some(); + if params.history.is_some() { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: "thread/resume.history is not supported by the Every Code app-server" + .to_string(), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } + let thread_id = params.thread_id.clone(); let catalog = SessionCatalog::new(self.config.code_home.clone()); @@ -1030,14 +1040,9 @@ impl CodexMessageProcessor { ) { Some(path) => path, None => { - let message = if unsupported_history { - "thread/resume.history is not supported by the Every Code app-server without a rollout path" - } else { - "thread not found" - }; let error = JSONRPCErrorError { code: INVALID_REQUEST_ERROR_CODE, - message: message.to_string(), + message: "thread not found".to_string(), data: None, }; self.outgoing.send_error(request_id, error).await; @@ -1091,11 +1096,8 @@ impl CodexMessageProcessor { session_configured: _, .. }) => { - let canonical_thread_id = thread_resume_canonical_thread_id( - &thread_id, - &rollout_path, - catalog_entry.as_ref(), - ); + let canonical_thread_id = + thread_resume_canonical_thread_id(conversation_id, &rollout_path, catalog_entry.as_ref()); let thread = thread_resume_response_thread( &canonical_thread_id, catalog_entry.as_ref(), @@ -1308,9 +1310,47 @@ impl CodexMessageProcessor { .await; } code_app_server_protocol::ReviewDelivery::Detached => { - let mut config = (*self.config).clone(); let catalog = SessionCatalog::new(self.config.code_home.clone()); - if let Ok(Some(entry)) = catalog.find_by_id(&thread_id).await { + let catalog_entry = match catalog.find_by_id(&thread_id).await { + Ok(entry) => entry, + Err(err) => { + self.outgoing + .send_error( + request_id, + JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: format!( + "failed to resolve detached review thread: {err}" + ), + data: None, + }, + ) + .await; + return; + } + }; + if catalog_entry.is_none() + && self + .conversation_manager + .get_conversation(resolved_thread_id) + .await + .is_err() + { + self.outgoing + .send_error( + request_id, + JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("thread not found: {thread_id}"), + data: None, + }, + ) + .await; + return; + } + + let mut config = (*self.config).clone(); + if let Some(entry) = catalog_entry { config.cwd = entry.cwd_real; } @@ -1857,14 +1897,14 @@ fn thread_resume_rollout_path( } fn thread_resume_canonical_thread_id( - requested_thread_id: &str, + resumed_conversation_id: ConversationId, rollout_path: &std::path::Path, entry: Option<&code_core::SessionIndexEntry>, ) -> String { conversation_id_from_rollout_path(rollout_path) .map(|conversation_id| conversation_id.to_string()) .or_else(|| entry.map(|item| item.session_id.to_string())) - .unwrap_or_else(|| requested_thread_id.to_string()) + .unwrap_or_else(|| resumed_conversation_id.to_string()) } fn thread_resume_response_thread( @@ -2573,6 +2613,8 @@ impl IntoWireAuthMode for code_app_server_protocol::AuthMode { mod tests { use super::*; use code_app_server_protocol::AuthMode; + use code_app_server_protocol::ReviewDelivery; + use code_app_server_protocol::ReviewTarget; use code_core::auth::CodexAuth; use code_core::auth::RefreshTokenError; use code_core::config::ConfigOverrides; @@ -2641,6 +2683,16 @@ mod tests { ) } + async fn expect_error_message( + outgoing_rx: &mut mpsc::UnboundedReceiver, + ) -> String { + let message = outgoing_rx.recv().await.expect("error response should be sent"); + match message { + crate::outgoing_message::OutgoingMessage::Error(err) => err.error.message, + other => panic!("expected error response, got {other:?}"), + } + } + #[tokio::test] async fn remove_conversation_listener_enforces_owner_connection() { let (mut processor, mut outgoing_rx) = make_processor_for_tests(); @@ -2746,6 +2798,57 @@ mod tests { ); } + #[tokio::test] + async fn thread_resume_v2_rejects_history_even_with_path() { + let (processor, mut outgoing_rx) = make_processor_for_tests(); + + processor + .thread_resume_v2( + RequestId::Integer(7), + ThreadResumeParams { + thread_id: Uuid::new_v4().to_string(), + history: Some(Vec::new()), + path: Some(std::path::PathBuf::from("/tmp/rollout.jsonl")), + model: None, + model_provider: None, + cwd: None, + approval_policy: None, + sandbox: None, + config: None, + base_instructions: None, + developer_instructions: None, + personality: None, + }, + ) + .await; + + let message = expect_error_message(&mut outgoing_rx).await; + assert_eq!( + message, + "thread/resume.history is not supported by the Every Code app-server" + ); + } + + #[tokio::test] + async fn review_start_v2_detached_rejects_unknown_source_thread() { + let (processor, mut outgoing_rx) = make_processor_for_tests(); + let unknown_thread_id = Uuid::new_v4().to_string(); + + processor + .review_start_v2( + RequestId::Integer(8), + ReviewStartParams { + thread_id: unknown_thread_id.clone(), + target: ReviewTarget::UncommittedChanges, + delivery: Some(ReviewDelivery::Detached), + }, + ) + .await; + + let message = expect_error_message(&mut outgoing_rx).await; + assert_eq!(message, format!("thread not found: {unknown_thread_id}")); + } + #[test] fn parse_plan_type_is_case_insensitive() { assert_eq!(parse_plan_type(Some("Pro".to_string())), PlanType::Pro); @@ -2892,6 +2995,10 @@ mod tests { #[test] fn thread_resume_canonical_thread_id_prefers_rollout_path() { + let resumed_conversation_id = ConversationId::from_string( + "33333333-3333-4333-8333-333333333333", + ) + .expect("valid uuid"); let entry = SessionIndexEntry { session_id: Uuid::parse_str("11111111-1111-4111-8111-111111111111") .expect("valid uuid"), @@ -2916,7 +3023,7 @@ mod tests { }; let canonical_thread_id = thread_resume_canonical_thread_id( - &entry.session_id.to_string(), + resumed_conversation_id, std::path::Path::new( "/tmp/rollout-2026-04-03T09-10-00Z-22222222-2222-4222-8222-222222222222.jsonl", ), @@ -2926,6 +3033,22 @@ mod tests { assert_eq!(canonical_thread_id, "22222222-2222-4222-8222-222222222222"); } + #[test] + fn thread_resume_canonical_thread_id_falls_back_to_resumed_conversation() { + let resumed_conversation_id = ConversationId::from_string( + "33333333-3333-4333-8333-333333333333", + ) + .expect("valid uuid"); + + let canonical_thread_id = thread_resume_canonical_thread_id( + resumed_conversation_id, + std::path::Path::new("/tmp/rollout-without-uuid.jsonl"), + None, + ); + + assert_eq!(canonical_thread_id, "33333333-3333-4333-8333-333333333333"); + } + #[test] fn derive_config_from_params_applies_model_provider_override() { let params = NewConversationParams {