From 191e89fc9f76f8b1bf5e1319b5a19724266c73dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=82=BB=E6=A2=A6=E5=85=BD?= Date: Sun, 5 Apr 2026 14:23:46 +0800 Subject: [PATCH 1/3] refactor(runtime): snapshot session policy for session tooling Load allowed tools/skills/policy/mcp config once per session and centralize CLI wiring. --- crates/loopforge-cli/src/dispatch/agent.rs | 24 +- .../src/session_runner/chat_loop.rs | 9 +- .../rexos-runtime/src/session_skills/audit.rs | 6 +- .../src/session_skills/storage.rs | 228 ++++++++++++++++++ crates/rexos-runtime/src/workflow.rs | 6 +- 5 files changed, 255 insertions(+), 18 deletions(-) diff --git a/crates/loopforge-cli/src/dispatch/agent.rs b/crates/loopforge-cli/src/dispatch/agent.rs index e3e47d5..d8c45cd 100644 --- a/crates/loopforge-cli/src/dispatch/agent.rs +++ b/crates/loopforge-cli/src/dispatch/agent.rs @@ -17,15 +17,21 @@ pub(super) async fn run(command: AgentCommand) -> anyhow::Result<()> { Some(id) => id, None => rexos::harness::resolve_session_id(&workspace)?, }; - if !allowed_tools.is_empty() { - agent.set_session_allowed_tools(&session_id, allowed_tools)?; - } - if let Some(path) = mcp_config.as_ref() { - let raw = std::fs::read_to_string(path)?; - let json: serde_json::Value = - serde_json::from_str(&raw).map_err(|err| anyhow::anyhow!("{err}"))?; - agent.set_session_mcp_config(&session_id, serde_json::to_string(&json)?)?; - } + let allowed_tools = if allowed_tools.is_empty() { + None + } else { + Some(allowed_tools) + }; + let mcp_config_json = match mcp_config.as_ref() { + Some(path) => { + let raw = std::fs::read_to_string(path)?; + let json: serde_json::Value = + serde_json::from_str(&raw).map_err(|err| anyhow::anyhow!("{err}"))?; + Some(serde_json::to_string(&json)?) + } + None => None, + }; + agent.configure_session_tooling(&session_id, allowed_tools, mcp_config_json)?; let out = agent .run_session( workspace, diff --git a/crates/rexos-runtime/src/session_runner/chat_loop.rs b/crates/rexos-runtime/src/session_runner/chat_loop.rs index 88c6dc7..a6ea03c 100644 --- a/crates/rexos-runtime/src/session_runner/chat_loop.rs +++ b/crates/rexos-runtime/src/session_runner/chat_loop.rs @@ -19,16 +19,17 @@ impl AgentRuntime { user_prompt: &str, kind: TaskKind, ) -> anyhow::Result { - let allowed_tools = self.load_session_allowed_tools(session_id)?; - let allowed_lookup: Option> = allowed_tools + let mut policy = self.load_session_policy_snapshot(session_id)?; + let allowed_lookup: Option> = policy + .allowed_tools .as_ref() .map(|tools| tools.iter().cloned().collect()); - let mcp_config = self.load_session_mcp_config(session_id)?; + let allowed_tools = policy.allowed_tools.take(); let tools = Toolset::new_with_allowed_tools_security_and_mcp_config( workspace_root.clone(), allowed_tools, self.security.clone(), - mcp_config.as_deref(), + policy.mcp_config_json.as_deref(), ) .await?; let provider = self.router.provider_for(kind); diff --git a/crates/rexos-runtime/src/session_skills/audit.rs b/crates/rexos-runtime/src/session_skills/audit.rs index ba371e0..d026850 100644 --- a/crates/rexos-runtime/src/session_skills/audit.rs +++ b/crates/rexos-runtime/src/session_skills/audit.rs @@ -31,7 +31,9 @@ impl AgentRuntime { skill_name: &str, requested_permissions: &[String], ) -> anyhow::Result<()> { - if let Some(allowed_skills) = self.load_session_allowed_skills(session_id)? { + let session_policy = self.load_session_policy_snapshot(session_id)?; + + if let Some(allowed_skills) = session_policy.allowed_skills.as_ref() { if !allowed_skills .iter() .any(|skill| skill.eq_ignore_ascii_case(skill_name.trim())) @@ -52,7 +54,7 @@ impl AgentRuntime { } } - let policy: SessionSkillPolicy = self.load_session_skill_policy(session_id)?; + let policy: SessionSkillPolicy = session_policy.skill_policy; if !policy.allowlist.is_empty() && !policy .allowlist diff --git a/crates/rexos-runtime/src/session_skills/storage.rs b/crates/rexos-runtime/src/session_skills/storage.rs index 388970e..ff0f766 100644 --- a/crates/rexos-runtime/src/session_skills/storage.rs +++ b/crates/rexos-runtime/src/session_skills/storage.rs @@ -8,6 +8,14 @@ use crate::{ SESSION_SKILL_POLICY_KEY_PREFIX, }; +#[derive(Debug, Clone)] +pub(crate) struct SessionPolicySnapshot { + pub(crate) allowed_tools: Option>, + pub(crate) allowed_skills: Option>, + pub(crate) skill_policy: SessionSkillPolicy, + pub(crate) mcp_config_json: Option, +} + fn normalize_names(values: impl IntoIterator) -> Vec { let mut cleaned = Vec::new(); let mut seen = HashSet::new(); @@ -40,6 +48,33 @@ impl AgentRuntime { format!("{SESSION_SKILL_POLICY_KEY_PREFIX}{session_id}") } + pub(crate) fn load_session_policy_snapshot( + &self, + session_id: &str, + ) -> anyhow::Result { + Ok(SessionPolicySnapshot { + allowed_tools: self.load_session_allowed_tools(session_id)?, + allowed_skills: self.load_session_allowed_skills(session_id)?, + skill_policy: self.load_session_skill_policy(session_id)?, + mcp_config_json: self.load_session_mcp_config(session_id)?, + }) + } + + pub fn configure_session_tooling( + &self, + session_id: &str, + allowed_tools: Option>, + mcp_config_json: Option, + ) -> anyhow::Result<()> { + if let Some(tools) = allowed_tools { + self.set_session_allowed_tools(session_id, tools)?; + } + if let Some(config_json) = mcp_config_json { + self.set_session_mcp_config(session_id, config_json)?; + } + Ok(()) + } + pub fn set_session_allowed_tools( &self, session_id: &str, @@ -157,3 +192,196 @@ impl AgentRuntime { Ok(policy) } } + +#[cfg(test)] +mod tests { + use std::collections::BTreeMap; + + use rexos_kernel::config::{LlmConfig, ProviderConfig, ProviderKind, RexosConfig, RouteConfig}; + use rexos_kernel::paths::RexosPaths; + use rexos_kernel::router::{ModelRouter, TaskKind}; + use rexos_kernel::security::SecurityConfig; + use rexos_llm::registry::LlmRegistry; + use rexos_memory::MemoryStore; + + use crate::records::{WorkflowRunToolArgs, WorkflowStepToolArgs}; + use crate::AgentRuntime; + + fn build_agent(memory: MemoryStore) -> AgentRuntime { + let mut providers = BTreeMap::new(); + providers.insert( + "ollama".to_string(), + ProviderConfig { + kind: ProviderKind::OpenAiCompatible, + base_url: "http://127.0.0.1:11434/v1".to_string(), + api_key_env: "".to_string(), + default_model: "x".to_string(), + aws_bedrock: None, + }, + ); + + let security = SecurityConfig::default(); + let cfg = RexosConfig { + llm: LlmConfig::default(), + providers, + router: Default::default(), + security: security.clone(), + }; + let llms = LlmRegistry::from_config(&cfg).unwrap(); + let router = ModelRouter::new(rexos_kernel::config::RouterConfig { + planning: RouteConfig { + provider: "ollama".to_string(), + model: "x".to_string(), + }, + coding: RouteConfig { + provider: "ollama".to_string(), + model: "x".to_string(), + }, + summary: RouteConfig { + provider: "ollama".to_string(), + model: "x".to_string(), + }, + }); + AgentRuntime::new_with_security_config(memory, llms, router, security) + } + + #[test] + fn session_policy_snapshot_round_trips_and_normalizes() { + let tmp = tempfile::tempdir().unwrap(); + let paths = RexosPaths { + base_dir: tmp.path().join(".loopforge"), + }; + paths.ensure_dirs().unwrap(); + + let memory = MemoryStore::open_or_create(&paths).unwrap(); + let agent = build_agent(memory); + + agent + .set_session_allowed_tools( + "s1", + vec![ + " fs_read ".to_string(), + "".to_string(), + "fs_write".to_string(), + "fs_read".to_string(), + ], + ) + .unwrap(); + agent + .set_session_allowed_skills( + "s1", + vec![ + " safe-skill ".to_string(), + "safe-skill".to_string(), + "x".to_string(), + "".to_string(), + ], + ) + .unwrap(); + agent + .set_session_skill_policy( + "s1", + crate::SessionSkillPolicy { + allowlist: vec!["shell-helper".to_string()], + require_approval: true, + auto_approve_readonly: false, + }, + ) + .unwrap(); + agent + .set_session_mcp_config("s1", " {\"servers\":{}} ".to_string()) + .unwrap(); + + let snapshot = agent.load_session_policy_snapshot("s1").unwrap(); + assert_eq!( + snapshot.allowed_tools, + Some(vec!["fs_read".to_string(), "fs_write".to_string()]) + ); + assert_eq!( + snapshot.allowed_skills, + Some(vec!["safe-skill".to_string(), "x".to_string()]) + ); + assert_eq!( + snapshot.mcp_config_json, + Some("{\"servers\":{}}".to_string()) + ); + assert_eq!( + snapshot.skill_policy.allowlist, + vec!["shell-helper".to_string()] + ); + assert!(snapshot.skill_policy.require_approval); + assert!(!snapshot.skill_policy.auto_approve_readonly); + } + + #[test] + fn session_policy_snapshot_defaults_when_missing_or_blank() { + let tmp = tempfile::tempdir().unwrap(); + let paths = RexosPaths { + base_dir: tmp.path().join(".loopforge"), + }; + paths.ensure_dirs().unwrap(); + + let memory = MemoryStore::open_or_create(&paths).unwrap(); + let agent = build_agent(memory); + + agent + .set_session_mcp_config("s2", " ".to_string()) + .unwrap(); + let snapshot = agent.load_session_policy_snapshot("s2").unwrap(); + assert!(snapshot.allowed_tools.is_none()); + assert!(snapshot.allowed_skills.is_none()); + assert!(snapshot.mcp_config_json.is_none()); + assert!(!snapshot.skill_policy.auto_approve_readonly); + assert!(!snapshot.skill_policy.require_approval); + assert!(snapshot.skill_policy.allowlist.is_empty()); + } + + #[tokio::test] + async fn session_policy_workflow_uses_allowed_tools_snapshot() { + let tmp = tempfile::tempdir().unwrap(); + let workspace_root = tmp.path().join("workspace"); + std::fs::create_dir_all(&workspace_root).unwrap(); + + let paths = RexosPaths { + base_dir: tmp.path().join(".loopforge"), + }; + paths.ensure_dirs().unwrap(); + let memory = MemoryStore::open_or_create(&paths).unwrap(); + let agent = build_agent(memory); + + agent + .set_session_allowed_tools("s3", vec!["fs_read".to_string()]) + .unwrap(); + + let res = agent + .workflow_run( + &workspace_root, + "s3", + TaskKind::Coding, + WorkflowRunToolArgs { + workflow_id: Some("wf-policy".to_string()), + name: None, + steps: vec![WorkflowStepToolArgs { + tool: "fs_write".to_string(), + arguments: serde_json::json!({ + "path": "x.txt", + "content": "blocked", + }), + name: None, + approval_required: None, + }], + continue_on_error: None, + }, + ) + .await + .unwrap(); + + let res: serde_json::Value = serde_json::from_str(&res).unwrap(); + let saved_to = res["saved_to"].as_str().unwrap(); + let state_raw = std::fs::read_to_string(saved_to).unwrap(); + let state: serde_json::Value = serde_json::from_str(&state_raw).unwrap(); + let err = state["steps"][0]["error"].as_str().unwrap(); + assert!(err.contains("workflow step 0 (fs_write)"), "got: {err}"); + assert!(!workspace_root.join("x.txt").exists()); + } +} diff --git a/crates/rexos-runtime/src/workflow.rs b/crates/rexos-runtime/src/workflow.rs index cf03e44..5668749 100644 --- a/crates/rexos-runtime/src/workflow.rs +++ b/crates/rexos-runtime/src/workflow.rs @@ -45,13 +45,13 @@ impl AgentRuntime { let state_path = workflow_state_path(workspace_root, &workflow_id); self.write_workflow_state(&state_path, &state)?; - let allowed_tools = self.load_session_allowed_tools(session_id)?; - let mcp_config = self.load_session_mcp_config(session_id)?; + let mut policy = self.load_session_policy_snapshot(session_id)?; + let allowed_tools = policy.allowed_tools.take(); let tools = Toolset::new_with_allowed_tools_security_and_mcp_config( workspace_root.clone(), allowed_tools, self.security.clone(), - mcp_config.as_deref(), + policy.mcp_config_json.as_deref(), ) .await?; let continue_on_error = args.continue_on_error.unwrap_or(false); From 57081b543c202ed3a10cb12e92e517f9f0c6aebc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=82=BB=E6=A2=A6=E5=85=BD?= Date: Sun, 5 Apr 2026 14:24:10 +0800 Subject: [PATCH 2/3] refactor(tools): split MCP transport and routing Move connection/initialization and call/list/read forwarding into focused modules. --- crates/rexos-tools/src/mcp/mod.rs | 343 +----------------------- crates/rexos-tools/src/mcp/routing.rs | 186 +++++++++++++ crates/rexos-tools/src/mcp/tests.rs | 2 +- crates/rexos-tools/src/mcp/transport.rs | 182 +++++++++++++ 4 files changed, 381 insertions(+), 332 deletions(-) create mode 100644 crates/rexos-tools/src/mcp/routing.rs create mode 100644 crates/rexos-tools/src/mcp/transport.rs diff --git a/crates/rexos-tools/src/mcp/mod.rs b/crates/rexos-tools/src/mcp/mod.rs index d14fa21..dc66c2f 100644 --- a/crates/rexos-tools/src/mcp/mod.rs +++ b/crates/rexos-tools/src/mcp/mod.rs @@ -1,24 +1,23 @@ mod config; mod jsonrpc; +mod routing; mod stdio; +mod transport; mod types; #[cfg(test)] mod tests; -use std::collections::{BTreeMap, HashMap, HashSet}; -use std::hash::{Hash, Hasher}; +use std::collections::{BTreeMap, HashMap}; use std::path::Path; use std::sync::Arc; -use anyhow::{anyhow, Context}; -use rexos_llm::openai_compat::{ToolDefinition, ToolFunctionDefinition}; -use serde_json::Value; +use anyhow::Context; +use rexos_llm::openai_compat::ToolDefinition; pub(crate) use config::{McpServerConfig, McpServersConfig}; -use jsonrpc::JsonRpcClient; -use stdio::{spawn_stdio_server, StdioServer}; -use types::ToolsListResult; +use stdio::StdioServer; +use transport::ConnectedMcp; #[derive(Debug, Clone)] pub(crate) struct McpHub { @@ -54,57 +53,11 @@ impl McpHub { cfg: McpServersConfig, workspace_root: &Path, ) -> anyhow::Result { - if cfg.servers.is_empty() { - return Err(anyhow!("mcp config has no servers")); - } - - let mut servers: BTreeMap> = BTreeMap::new(); - let mut tool_targets: HashMap = HashMap::new(); - let mut tool_defs: Vec = Vec::new(); - let mut used_names: HashSet = HashSet::new(); - - for (name, server_cfg) in &cfg.servers { - let stdio = spawn_stdio_server(name, server_cfg, workspace_root).await?; - initialize(&stdio.client) - .await - .with_context(|| format!("mcp initialize: {name}"))?; - let server = Arc::new(McpServer { - name: name.clone(), - stdio, - }); - - let tools = list_all_tools(&server.stdio.client) - .await - .with_context(|| format!("mcp tools/list: {name}"))?; - - for tool in tools { - let local = allocate_local_tool_name(name, &tool.name, &mut used_names); - tool_targets.insert( - local.clone(), - McpToolTarget { - server: name.clone(), - remote_name: tool.name.clone(), - }, - ); - - tool_defs.push(ToolDefinition { - kind: "function".to_string(), - function: ToolFunctionDefinition { - name: local, - description: tool - .description - .unwrap_or_else(|| format!("MCP tool '{name}::{}'", tool.name)), - parameters: if tool.input_schema.is_null() { - serde_json::json!({ "type": "object" }) - } else { - tool.input_schema - }, - }, - }); - } - - servers.insert(name.clone(), server); - } + let ConnectedMcp { + servers, + tool_targets, + tool_defs, + } = transport::connect(cfg, workspace_root).await?; Ok(Self { servers, @@ -120,276 +73,4 @@ impl McpHub { pub(crate) fn server_names(&self) -> Vec { self.servers.keys().cloned().collect() } - - pub(crate) async fn call_tool( - &self, - local_name: &str, - arguments: Value, - ) -> anyhow::Result { - let target = self - .tool_targets - .get(local_name) - .ok_or_else(|| anyhow!("unknown mcp tool: {local_name}"))? - .clone(); - - let server = self - .servers - .get(&target.server) - .ok_or_else(|| anyhow!("unknown mcp server: {}", target.server))?; - - server - .stdio - .client - .request( - "tools/call", - Some(serde_json::json!({ - "name": target.remote_name, - "arguments": arguments, - })), - ) - .await - } - - pub(crate) async fn resources_list( - &self, - server: Option<&str>, - cursor: Option<&str>, - ) -> anyhow::Result { - self.forward_list_request("resources/list", "resources", server, cursor) - .await - } - - pub(crate) async fn prompts_list( - &self, - server: Option<&str>, - cursor: Option<&str>, - ) -> anyhow::Result { - self.forward_list_request("prompts/list", "prompts", server, cursor) - .await - } - - async fn forward_list_request( - &self, - method: &str, - field: &str, - server: Option<&str>, - cursor: Option<&str>, - ) -> anyhow::Result { - let cursor = cursor.map(|c| c.trim()).filter(|c| !c.is_empty()); - let params = cursor.map(|cursor| serde_json::json!({ "cursor": cursor })); - - match server.map(|s| s.trim()).filter(|s| !s.is_empty()) { - Some(name) => { - let server = self - .servers - .get(name) - .ok_or_else(|| anyhow!("unknown mcp server: {name}"))?; - let result = server.stdio.client.request(method, params).await?; - Ok(serde_json::json!({ "server": name, "result": result })) - } - None => { - let mut all: Vec = Vec::new(); - for name in self.servers.keys() { - let server = self - .servers - .get(name) - .ok_or_else(|| anyhow!("unknown mcp server: {name}"))?; - let result = server.stdio.client.request(method, params.clone()).await?; - let items = result.get(field).cloned().unwrap_or(Value::Null); - all.push(serde_json::json!({ "server": name, field: items, "nextCursor": result.get("nextCursor") })); - } - Ok(Value::Array(all)) - } - } - } - - pub(crate) async fn resources_read( - &self, - server: Option<&str>, - uri: &str, - ) -> anyhow::Result { - let uri = uri.trim(); - if uri.is_empty() { - return Err(anyhow!("mcp_resources_read: uri is empty")); - } - - match server.map(|s| s.trim()).filter(|s| !s.is_empty()) { - Some(name) => { - let server = self - .servers - .get(name) - .ok_or_else(|| anyhow!("unknown mcp server: {name}"))?; - let result = server - .stdio - .client - .request("resources/read", Some(serde_json::json!({ "uri": uri }))) - .await?; - Ok(serde_json::json!({ "server": name, "result": result })) - } - None => { - for name in self.servers.keys() { - let server = self - .servers - .get(name) - .ok_or_else(|| anyhow!("unknown mcp server: {name}"))?; - let res = server - .stdio - .client - .request("resources/read", Some(serde_json::json!({ "uri": uri }))) - .await; - if let Ok(result) = res { - return Ok(serde_json::json!({ "server": name, "result": result })); - } - } - Err(anyhow!("mcp_resources_read: no server handled uri: {uri}")) - } - } - } - - pub(crate) async fn prompts_get( - &self, - server: Option<&str>, - name: &str, - arguments: Option, - ) -> anyhow::Result { - let name = name.trim(); - if name.is_empty() { - return Err(anyhow!("mcp_prompts_get: name is empty")); - } - - let mut params = serde_json::Map::new(); - params.insert("name".to_string(), Value::String(name.to_string())); - if let Some(arguments) = arguments { - params.insert("arguments".to_string(), arguments); - } - let params = Value::Object(params); - - match server.map(|s| s.trim()).filter(|s| !s.is_empty()) { - Some(server_name) => { - let server = self - .servers - .get(server_name) - .ok_or_else(|| anyhow!("unknown mcp server: {server_name}"))?; - let result = server - .stdio - .client - .request("prompts/get", Some(params)) - .await?; - Ok(serde_json::json!({ "server": server_name, "result": result })) - } - None => { - for server_name in self.servers.keys() { - let server = self - .servers - .get(server_name) - .ok_or_else(|| anyhow!("unknown mcp server: {server_name}"))?; - let res = server - .stdio - .client - .request("prompts/get", Some(params.clone())) - .await; - if let Ok(result) = res { - return Ok(serde_json::json!({ "server": server_name, "result": result })); - } - } - Err(anyhow!("mcp_prompts_get: no server handled prompt: {name}")) - } - } - } -} - -async fn initialize(client: &JsonRpcClient) -> anyhow::Result<()> { - // Try a small set of known protocol revisions (latest-first) for broad compatibility. - const VERSIONS: [&str; 3] = ["2025-11-25", "2025-03-26", "2024-11-05"]; - - let params_base = |protocol: &str| { - serde_json::json!({ - "protocolVersion": protocol, - "capabilities": {}, - "clientInfo": { - "name": "loopforge", - "version": env!("CARGO_PKG_VERSION"), - } - }) - }; - - let mut last_err: Option = None; - for v in VERSIONS { - match client.request("initialize", Some(params_base(v))).await { - Ok(_) => { - client.notify("initialized", None).await?; - return Ok(()); - } - Err(err) => { - last_err = Some(err); - } - } - } - - Err(last_err.unwrap_or_else(|| anyhow!("mcp initialize failed"))) -} - -async fn list_all_tools(client: &JsonRpcClient) -> anyhow::Result> { - let mut cursor: Option = None; - let mut out: Vec = Vec::new(); - for _ in 0..32usize { - let params = cursor - .as_deref() - .map(|cursor| serde_json::json!({ "cursor": cursor })); - let value = client.request("tools/list", params).await?; - let parsed: ToolsListResult = - serde_json::from_value(value).context("decode tools/list result")?; - out.extend(parsed.tools.into_iter()); - cursor = parsed.next_cursor; - if cursor.as_deref().unwrap_or("").trim().is_empty() { - break; - } - } - Ok(out) -} - -fn allocate_local_tool_name(server: &str, tool: &str, used: &mut HashSet) -> String { - let server_part = sanitize_component(server); - let tool_part = sanitize_component(tool); - let mut candidate = format!("mcp_{server_part}__{tool_part}"); - - if candidate.len() > 64 { - let hash = short_hash(&candidate); - let suffix = format!("_{hash:08x}"); - candidate.truncate(64usize.saturating_sub(suffix.len())); - candidate.push_str(&suffix); - } - - if used.insert(candidate.clone()) { - return candidate; - } - - let hash = short_hash(&format!("{server}\0{tool}")); - let suffix = format!("_{hash:08x}"); - let mut out = candidate; - if out.len() + suffix.len() > 64 { - out.truncate(64usize.saturating_sub(suffix.len())); - } - out.push_str(&suffix); - used.insert(out.clone()); - out -} - -fn sanitize_component(value: &str) -> String { - let mut out = String::with_capacity(value.len()); - for c in value.chars() { - let c = if c.is_ascii_alphanumeric() || c == '_' || c == '-' { - c.to_ascii_lowercase() - } else { - '_' - }; - out.push(c); - } - out.trim_matches('_').to_string() -} - -fn short_hash(value: &str) -> u64 { - let mut h = std::collections::hash_map::DefaultHasher::new(); - value.hash(&mut h); - h.finish() } diff --git a/crates/rexos-tools/src/mcp/routing.rs b/crates/rexos-tools/src/mcp/routing.rs new file mode 100644 index 0000000..52696a0 --- /dev/null +++ b/crates/rexos-tools/src/mcp/routing.rs @@ -0,0 +1,186 @@ +use anyhow::anyhow; +use serde_json::Value; + +use super::McpHub; + +impl McpHub { + pub(crate) async fn call_tool( + &self, + local_name: &str, + arguments: Value, + ) -> anyhow::Result { + let target = self + .tool_targets + .get(local_name) + .ok_or_else(|| anyhow!("unknown mcp tool: {local_name}"))? + .clone(); + + let server = self + .servers + .get(&target.server) + .ok_or_else(|| anyhow!("unknown mcp server: {}", target.server))?; + + server + .stdio + .client + .request( + "tools/call", + Some(serde_json::json!({ + "name": target.remote_name, + "arguments": arguments, + })), + ) + .await + } + + pub(crate) async fn resources_list( + &self, + server: Option<&str>, + cursor: Option<&str>, + ) -> anyhow::Result { + self.forward_list_request("resources/list", "resources", server, cursor) + .await + } + + pub(crate) async fn prompts_list( + &self, + server: Option<&str>, + cursor: Option<&str>, + ) -> anyhow::Result { + self.forward_list_request("prompts/list", "prompts", server, cursor) + .await + } + + async fn forward_list_request( + &self, + method: &str, + field: &str, + server: Option<&str>, + cursor: Option<&str>, + ) -> anyhow::Result { + let cursor = cursor.map(|c| c.trim()).filter(|c| !c.is_empty()); + let params = cursor.map(|cursor| serde_json::json!({ "cursor": cursor })); + + match server.map(|s| s.trim()).filter(|s| !s.is_empty()) { + Some(name) => { + let server = self + .servers + .get(name) + .ok_or_else(|| anyhow!("unknown mcp server: {name}"))?; + let result = server.stdio.client.request(method, params).await?; + Ok(serde_json::json!({ "server": name, "result": result })) + } + None => { + let mut all: Vec = Vec::new(); + for name in self.servers.keys() { + let server = self + .servers + .get(name) + .ok_or_else(|| anyhow!("unknown mcp server: {name}"))?; + let result = server.stdio.client.request(method, params.clone()).await?; + let items = result.get(field).cloned().unwrap_or(Value::Null); + all.push(serde_json::json!({ + "server": name, + (field): items, + "nextCursor": result.get("nextCursor"), + })); + } + Ok(Value::Array(all)) + } + } + } + + pub(crate) async fn resources_read( + &self, + server: Option<&str>, + uri: &str, + ) -> anyhow::Result { + let uri = uri.trim(); + if uri.is_empty() { + return Err(anyhow!("mcp_resources_read: uri is empty")); + } + + match server.map(|s| s.trim()).filter(|s| !s.is_empty()) { + Some(name) => { + let server = self + .servers + .get(name) + .ok_or_else(|| anyhow!("unknown mcp server: {name}"))?; + let result = server + .stdio + .client + .request("resources/read", Some(serde_json::json!({ "uri": uri }))) + .await?; + Ok(serde_json::json!({ "server": name, "result": result })) + } + None => { + for name in self.servers.keys() { + let server = self + .servers + .get(name) + .ok_or_else(|| anyhow!("unknown mcp server: {name}"))?; + let res = server + .stdio + .client + .request("resources/read", Some(serde_json::json!({ "uri": uri }))) + .await; + if let Ok(result) = res { + return Ok(serde_json::json!({ "server": name, "result": result })); + } + } + Err(anyhow!("mcp_resources_read: no server handled uri: {uri}")) + } + } + } + + pub(crate) async fn prompts_get( + &self, + server: Option<&str>, + name: &str, + arguments: Option, + ) -> anyhow::Result { + let name = name.trim(); + if name.is_empty() { + return Err(anyhow!("mcp_prompts_get: name is empty")); + } + + let mut params = serde_json::Map::new(); + params.insert("name".to_string(), Value::String(name.to_string())); + if let Some(arguments) = arguments { + params.insert("arguments".to_string(), arguments); + } + let params = Value::Object(params); + + match server.map(|s| s.trim()).filter(|s| !s.is_empty()) { + Some(server_name) => { + let server = self + .servers + .get(server_name) + .ok_or_else(|| anyhow!("unknown mcp server: {server_name}"))?; + let result = server + .stdio + .client + .request("prompts/get", Some(params)) + .await?; + Ok(serde_json::json!({ "server": server_name, "result": result })) + } + None => { + for server_name in self.servers.keys() { + let server = self + .servers + .get(server_name) + .ok_or_else(|| anyhow!("unknown mcp server: {server_name}"))?; + let res = server + .stdio + .client + .request("prompts/get", Some(params.clone())) + .await; + if let Ok(result) = res { + return Ok(serde_json::json!({ "server": server_name, "result": result })); + } + } + Err(anyhow!("mcp_prompts_get: no server handled prompt: {name}")) + } + } + } +} diff --git a/crates/rexos-tools/src/mcp/tests.rs b/crates/rexos-tools/src/mcp/tests.rs index 340a998..0fd2da4 100644 --- a/crates/rexos-tools/src/mcp/tests.rs +++ b/crates/rexos-tools/src/mcp/tests.rs @@ -1,6 +1,6 @@ use std::collections::HashSet; -use super::*; +use super::transport::{allocate_local_tool_name, sanitize_component}; #[test] fn sanitize_component_rewrites_non_ascii_to_underscores() { diff --git a/crates/rexos-tools/src/mcp/transport.rs b/crates/rexos-tools/src/mcp/transport.rs new file mode 100644 index 0000000..36da203 --- /dev/null +++ b/crates/rexos-tools/src/mcp/transport.rs @@ -0,0 +1,182 @@ +use std::collections::{BTreeMap, HashMap, HashSet}; +use std::hash::{Hash, Hasher}; +use std::path::Path; +use std::sync::Arc; + +use anyhow::{anyhow, Context}; +use rexos_llm::openai_compat::{ToolDefinition, ToolFunctionDefinition}; + +use super::config::McpServersConfig; +use super::jsonrpc::JsonRpcClient; +use super::stdio::spawn_stdio_server; +use super::types::{self, ToolsListResult}; +use super::{McpServer, McpToolTarget}; + +pub(super) struct ConnectedMcp { + pub(super) servers: BTreeMap>, + pub(super) tool_targets: HashMap, + pub(super) tool_defs: Vec, +} + +pub(super) async fn connect( + cfg: McpServersConfig, + workspace_root: &Path, +) -> anyhow::Result { + if cfg.servers.is_empty() { + return Err(anyhow!("mcp config has no servers")); + } + + let mut servers: BTreeMap> = BTreeMap::new(); + let mut tool_targets: HashMap = HashMap::new(); + let mut tool_defs: Vec = Vec::new(); + let mut used_names: HashSet = HashSet::new(); + + for (name, server_cfg) in &cfg.servers { + let stdio = spawn_stdio_server(name, server_cfg, workspace_root).await?; + initialize(&stdio.client) + .await + .with_context(|| format!("mcp initialize: {name}"))?; + let server = Arc::new(McpServer { + name: name.clone(), + stdio, + }); + + let tools = list_all_tools(&server.stdio.client) + .await + .with_context(|| format!("mcp tools/list: {name}"))?; + + for tool in tools { + let local = allocate_local_tool_name(name, &tool.name, &mut used_names); + tool_targets.insert( + local.clone(), + McpToolTarget { + server: name.clone(), + remote_name: tool.name.clone(), + }, + ); + + tool_defs.push(ToolDefinition { + kind: "function".to_string(), + function: ToolFunctionDefinition { + name: local, + description: tool + .description + .unwrap_or_else(|| format!("MCP tool '{name}::{}'", tool.name)), + parameters: if tool.input_schema.is_null() { + serde_json::json!({ "type": "object" }) + } else { + tool.input_schema + }, + }, + }); + } + + servers.insert(name.clone(), server); + } + + Ok(ConnectedMcp { + servers, + tool_targets, + tool_defs, + }) +} + +async fn initialize(client: &JsonRpcClient) -> anyhow::Result<()> { + // Try a small set of known protocol revisions (latest-first) for broad compatibility. + const VERSIONS: [&str; 3] = ["2025-11-25", "2025-03-26", "2024-11-05"]; + + let params_base = |protocol: &str| { + serde_json::json!({ + "protocolVersion": protocol, + "capabilities": {}, + "clientInfo": { + "name": "loopforge", + "version": env!("CARGO_PKG_VERSION"), + } + }) + }; + + let mut last_err: Option = None; + for v in VERSIONS { + match client.request("initialize", Some(params_base(v))).await { + Ok(_) => { + client.notify("initialized", None).await?; + return Ok(()); + } + Err(err) => { + last_err = Some(err); + } + } + } + + Err(last_err.unwrap_or_else(|| anyhow!("mcp initialize failed"))) +} + +async fn list_all_tools(client: &JsonRpcClient) -> anyhow::Result> { + let mut cursor: Option = None; + let mut out: Vec = Vec::new(); + for _ in 0..32usize { + let params = cursor + .as_deref() + .map(|cursor| serde_json::json!({ "cursor": cursor })); + let value = client.request("tools/list", params).await?; + let parsed: ToolsListResult = + serde_json::from_value(value).context("decode tools/list result")?; + out.extend(parsed.tools.into_iter()); + cursor = parsed.next_cursor; + if cursor.as_deref().unwrap_or("").trim().is_empty() { + break; + } + } + Ok(out) +} + +pub(super) fn allocate_local_tool_name( + server: &str, + tool: &str, + used: &mut HashSet, +) -> String { + let server_part = sanitize_component(server); + let tool_part = sanitize_component(tool); + let mut candidate = format!("mcp_{server_part}__{tool_part}"); + + if candidate.len() > 64 { + let hash = short_hash(&candidate); + let suffix = format!("_{hash:08x}"); + candidate.truncate(64usize.saturating_sub(suffix.len())); + candidate.push_str(&suffix); + } + + if used.insert(candidate.clone()) { + return candidate; + } + + let hash = short_hash(&format!("{server}\0{tool}")); + let suffix = format!("_{hash:08x}"); + let mut out = candidate; + if out.len() + suffix.len() > 64 { + out.truncate(64usize.saturating_sub(suffix.len())); + } + out.push_str(&suffix); + used.insert(out.clone()); + out +} + +pub(super) fn sanitize_component(value: &str) -> String { + let mut out = String::with_capacity(value.len()); + for c in value.chars() { + let c = if c.is_ascii_alphanumeric() || c == '_' || c == '-' { + c.to_ascii_lowercase() + } else { + '_' + }; + out.push(c); + } + out.trim_matches('_').to_string() +} + +fn short_hash(value: &str) -> u64 { + let mut h = std::collections::hash_map::DefaultHasher::new(); + value.hash(&mut h); + h.finish() +} From a79e8f28081588ef708b9c0671a4a26214e6355e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=82=BB=E6=A2=A6=E5=85=BD?= Date: Sun, 5 Apr 2026 14:35:38 +0800 Subject: [PATCH 3/3] chore(tools): include MCP stderr tail in connect errors --- crates/rexos-tools/src/mcp/transport.rs | 37 +++++++++++++++++++++---- 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/crates/rexos-tools/src/mcp/transport.rs b/crates/rexos-tools/src/mcp/transport.rs index 36da203..a940a4f 100644 --- a/crates/rexos-tools/src/mcp/transport.rs +++ b/crates/rexos-tools/src/mcp/transport.rs @@ -33,18 +33,26 @@ pub(super) async fn connect( for (name, server_cfg) in &cfg.servers { let stdio = spawn_stdio_server(name, server_cfg, workspace_root).await?; - initialize(&stdio.client) + if let Err(err) = initialize(&stdio.client) .await - .with_context(|| format!("mcp initialize: {name}"))?; + .with_context(|| format!("mcp initialize: {name}")) + { + return Err(append_stderr_tail_context(err, name, &stdio).await); + } + + let tools = match list_all_tools(&stdio.client) + .await + .with_context(|| format!("mcp tools/list: {name}")) + { + Ok(tools) => tools, + Err(err) => return Err(append_stderr_tail_context(err, name, &stdio).await), + }; + let server = Arc::new(McpServer { name: name.clone(), stdio, }); - let tools = list_all_tools(&server.stdio.client) - .await - .with_context(|| format!("mcp tools/list: {name}"))?; - for tool in tools { let local = allocate_local_tool_name(name, &tool.name, &mut used_names); tool_targets.insert( @@ -81,6 +89,23 @@ pub(super) async fn connect( }) } +async fn append_stderr_tail_context( + err: anyhow::Error, + name: &str, + stdio: &super::stdio::StdioServer, +) -> anyhow::Error { + let tail = stdio.stderr_tail().lock().await.clone(); + if tail.is_empty() { + return err; + } + + err.context(format!( + "mcp server '{name}' stderr tail (last {} lines):\n{}", + tail.len(), + tail.join("\n") + )) +} + async fn initialize(client: &JsonRpcClient) -> anyhow::Result<()> { // Try a small set of known protocol revisions (latest-first) for broad compatibility. const VERSIONS: [&str; 3] = ["2025-11-25", "2025-03-26", "2024-11-05"];