From d32609d2a89985f94c7d3e6c710d39cdf6706a93 Mon Sep 17 00:00:00 2001 From: linhdmn Date: Fri, 24 Apr 2026 14:36:07 +0700 Subject: [PATCH 1/4] feat: add MCP HTTP transport for remote MCP server Add mcp-http CLI command that starts LeanKG MCP server with Streamable HTTP transport per MCP spec. Features: - POST /mcp endpoint for JSON-RPC requests - GET /mcp/stream endpoint for SSE streaming - GET /health endpoint for health checks - Bearer token authentication (--auth flag) - CORS headers for browser-based clients - Default port 9699 (configurable via --port or MCP_HTTP_PORT env var) - Watch mode support (--watch flag) This enables LeanKG to serve multiple concurrent MCP clients remotely, complementing the existing stdio transport for local use. --- src/cli/mod.rs | 12 ++ src/main.rs | 35 +++++ src/mcp/server.rs | 319 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 366 insertions(+) diff --git a/src/cli/mod.rs b/src/cli/mod.rs index d1ec8ce..0091525 100644 --- a/src/cli/mod.rs +++ b/src/cli/mod.rs @@ -58,6 +58,18 @@ pub enum CLICommand { #[arg(long)] watch: bool, }, + /// Start MCP server with HTTP transport (for remote clients) + McpHttp { + /// Port to listen on (default: 9699) + #[arg(long)] + port: Option, + /// Bearer token for authentication (optional) + #[arg(long)] + auth: Option, + /// Enable auto-indexing with file watcher + #[arg(long)] + watch: bool, + }, /// Calculate impact radius Impact { /// File to analyze diff --git a/src/main.rs b/src/main.rs index ef3bc9d..67200d5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -137,6 +137,41 @@ async fn main() -> Result<(), Box> { eprintln!("MCP stdio server error: {}", e); } } + cli::CLICommand::McpHttp { port, auth, watch } => { + let project_path = find_project_root()?; + let db_path = project_path.join(".leankg"); + let port = port.unwrap_or_else(|| { + std::env::var("MCP_HTTP_PORT") + .ok() + .and_then(|p| p.parse().ok()) + .unwrap_or(9699) + }); + let auth_token = auth.or_else(|| std::env::var("MCP_HTTP_AUTH").ok()); + + tokio::fs::create_dir_all(&db_path).await.ok(); + + let mcp_server = if watch { + mcp::MCPServer::new_with_watch(db_path.clone(), project_path.clone()) + } else { + mcp::MCPServer::new(db_path.clone()) + }; + + println!("╔═══════════════════════════════════════════════════════════════╗"); + println!("║ LeanKG MCP HTTP Server (Remote Mode) ║"); + println!("╚═══════════════════════════════════════════════════════════════╝"); + println!(); + println!("🚀 Starting MCP HTTP server on http://localhost:{}", port); + if auth_token.is_some() { + println!("🔒 Authentication: enabled"); + } else { + println!("🔓 Authentication: disabled (not recommended for production)"); + } + println!(); + + if let Err(e) = mcp_server.serve_http(port, auth_token).await { + eprintln!("MCP HTTP server error: {}", e); + } + } cli::CLICommand::Impact { file, depth } => { let project_path = find_project_root()?; let db_path = project_path.join(".leankg"); diff --git a/src/mcp/server.rs b/src/mcp/server.rs index 031689e..621c833 100644 --- a/src/mcp/server.rs +++ b/src/mcp/server.rs @@ -7,15 +7,26 @@ use crate::mcp::tools::ToolRegistry; use crate::mcp::tracker::WriteTracker; use crate::mcp::watcher::start_watcher; use crate::orchestrator::intent::IntentParser; +use axum::{ + body::Body, + extract::State, + http::{header, HeaderMap, Method, StatusCode}, + response::Response, + routing::{get, post}, + Router, +}; +// use futures_util::StreamExt; // Reserved for future streaming support use parking_lot::RwLock; use rmcp::handler::server::ServerHandler; use rmcp::model::{CallToolRequestParams, CallToolResult, Content, ListToolsResult, Tool}; use rmcp::service::{serve_server, RoleServer}; use rmcp::transport::stdio; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::RwLock as TokioRwLock; +use tower_http::cors::{Any, CorsLayer}; pub struct MCPServer { auth_config: Arc>, @@ -290,6 +301,62 @@ impl MCPServer { futures_util::future::pending().await } + pub async fn serve_http( + &self, + port: u16, + auth_token: Option, + ) -> Result<(), Box> { + if let Err(e) = self.auto_init_if_needed().await { + tracing::warn!( + "Auto-init skipped: {}. Server will operate in uninitialized state.", + e + ); + } + + if let Some(ref watch_path) = self.watch_path { + let db_path = self.get_db_path(); + let watch_path = watch_path.clone(); + tokio::spawn(async move { + let (tx, rx) = tokio::sync::mpsc::channel(100); + start_watcher(db_path, watch_path, rx).await; + let _ = tx; // silence unused warning + }); + tracing::info!( + "Auto-indexing enabled for {}", + self.watch_path + .as_ref() + .unwrap_or(&std::path::PathBuf::from("?")) + .display() + ); + } + + let server = Arc::new(HttpMcpServer { + mcp_server: self.clone(), + auth_token, + }); + + let cors = CorsLayer::new() + .allow_origin(Any) + .allow_methods([Method::GET, Method::POST, Method::OPTIONS]) + .allow_headers(Any) + .expose_headers([header::CONTENT_TYPE]); + + let app = Router::new() + .route("/mcp", post(handle_mcp_request)) + .route("/mcp/stream", get(handle_sse_stream)) + .route("/health", get(health_check)) + .layer(cors) + .with_state(server); + + let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port)); + let listener = tokio::net::TcpListener::bind(addr).await?; + tracing::info!("MCP HTTP server listening on http://{}", addr); + + axum::serve(listener, app).await?; + + Ok(()) + } + async fn auto_init_if_needed(&self) -> Result<(), String> { let project_root = self.find_project_root()?; @@ -758,6 +825,258 @@ impl ServerHandler for MCPServer { } } +// ============================================================================ +// HTTP Transport for Remote MCP Server +// ============================================================================ + +/// HTTP MCP Server state shared across requests +struct HttpMcpServer { + mcp_server: MCPServer, + auth_token: Option, +} + +/// MCP JSON-RPC request envelope +#[derive(Debug, Serialize, Deserialize)] +struct JsonRpcRequest { + jsonrpc: String, + id: serde_json::Value, + method: String, + params: Option, +} + +/// MCP JSON-RPC response envelope +#[derive(Debug, Serialize)] +struct JsonRpcResponse { + jsonrpc: String, + id: serde_json::Value, + result: Option, + error: Option, +} + +#[derive(Debug, Serialize)] +struct JsonRpcError { + code: i32, + message: String, + data: Option, +} + +/// MCP JSON-RPC error codes +mod json_rpc_code { + pub const PARSE_ERROR: i32 = -32700; + pub const INVALID_REQUEST: i32 = -32600; + pub const METHOD_NOT_FOUND: i32 = -32601; + pub const INVALID_PARAMS: i32 = -32602; + pub const INTERNAL_ERROR: i32 = -32603; +} + +/// SSE event types for MCP protocol +#[derive(Debug, Clone, Copy)] +enum SseEventType { + Message, + Endpoint, +} + +/// Extract bearer token from Authorization header +fn extract_bearer_token(auth_header: Option<&str>, token: &Option) -> bool { + if token.is_none() { + return true; // No auth required + } + let token = token.as_ref().unwrap(); + + if let Some(auth) = auth_header { + if let Some(stripped) = auth.strip_prefix("Bearer ") { + return stripped == token.as_str(); + } + } + false +} + +/// Handle POST /mcp - JSON-RPC request endpoint +async fn handle_mcp_request( + State(server): State>, + headers: HeaderMap, + body: String, +) -> Response { + // Extract Authorization header + let auth_value = headers + .get(header::AUTHORIZATION) + .and_then(|v| v.to_str().ok()); + + // Check authentication + if !extract_bearer_token(auth_value, &server.auth_token) { + return Response::builder() + .status(StatusCode::UNAUTHORIZED) + .body(Body::from(r#"{"error": "Unauthorized"}"#)) + .unwrap(); + } + + // Parse JSON-RPC request + let request: JsonRpcRequest = match serde_json::from_str(&body) { + Ok(req) => req, + Err(e) => { + let response = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + id: serde_json::Value::Null, + result: None, + error: Some(JsonRpcError { + code: json_rpc_code::PARSE_ERROR, + message: format!("Parse error: {}", e), + data: None, + }), + }; + return Response::builder() + .status(StatusCode::OK) + .header(header::CONTENT_TYPE, "application/json") + .body(Body::from(serde_json::to_string(&response).unwrap())) + .unwrap(); + } + }; + + // Process the request + let result = process_jsonrpc_request(&server.mcp_server, &request).await; + + // Build response + let response = match result { + Ok(result) => JsonRpcResponse { + jsonrpc: "2.0".to_string(), + id: request.id, + result: Some(result), + error: None, + }, + Err(e) => JsonRpcResponse { + jsonrpc: "2.0".to_string(), + id: request.id, + result: None, + error: Some(JsonRpcError { + code: json_rpc_code::INTERNAL_ERROR, + message: e, + data: None, + }), + }, + }; + + Response::builder() + .status(StatusCode::OK) + .header(header::CONTENT_TYPE, "application/json") + .body(Body::from(serde_json::to_string(&response).unwrap())) + .unwrap() +} + +/// Process a JSON-RPC request and return the result +async fn process_jsonrpc_request( + mcp_server: &MCPServer, + request: &JsonRpcRequest, +) -> Result { + let method = &request.method; + let params = request.params.as_ref(); + + match method.as_str() { + "initialize" => Ok(serde_json::json!({ + "protocolVersion": "2025-06-18", + "capabilities": { + "tools": { "listChanged": true }, + "resources": {} + }, + "serverInfo": { + "name": "leankg", + "version": env!("CARGO_PKG_VERSION") + } + })), + "notifications/initialized" => { + // Client is done initializing, no response needed + Ok(serde_json::Value::Null) + } + "tools/list" => { + let tools = ToolRegistry::list_tools(); + let rmcp_tools: Vec = tools + .into_iter() + .map(|t| { + serde_json::json!({ + "name": t.name, + "description": t.description, + "inputSchema": t.input_schema + }) + }) + .collect(); + Ok(serde_json::json!({ "tools": rmcp_tools })) + } + "tools/call" => { + let params_obj = params + .and_then(|p| p.as_object()) + .ok_or("Missing params for tools/call")?; + + let tool_name = params_obj + .get("name") + .and_then(|v| v.as_str()) + .ok_or("Missing tool name")?; + + let arguments = params_obj + .get("arguments") + .and_then(|v| v.as_object()) + .cloned() + .unwrap_or_default(); + + let result = mcp_server + .execute_tool(tool_name, arguments) + .await + .map_err(|e| e.to_string())?; + + // Format as MCP tool result + let content_str = if let Some(s) = result.as_str() { + s.to_string() + } else { + crate::mcp::toon::wrap_response(tool_name, &result, true) + }; + + Ok(serde_json::json!({ + "content": [{ "type": "text", "text": content_str }] + })) + } + _ => Err(format!("Method not found: {}", method)), + } +} + +/// Handle GET /mcp/stream - SSE endpoint for server-initiated messages +async fn handle_sse_stream( + State(server): State>, + headers: HeaderMap, +) -> Response { + // Extract Authorization header + let auth_value = headers + .get(header::AUTHORIZATION) + .and_then(|v| v.to_str().ok()); + + // Check authentication + if !extract_bearer_token(auth_value, &server.auth_token) { + return Response::builder() + .status(StatusCode::UNAUTHORIZED) + .body(Body::from(r#"event: error\ndata: Unauthorized\n\n"#)) + .unwrap(); + } + + // For now, return an SSE stream that sends an endpoint message + // In a full implementation, this would maintain a persistent connection + // for server-initiated notifications + let sse_data = "event: endpoint\ndata: /mcp\n\n"; + + Response::builder() + .status(StatusCode::OK) + .header(header::CONTENT_TYPE, "text/event-stream") + .header(header::CACHE_CONTROL, "no-cache") + .header(header::CONNECTION, "keep-alive") + .body(Body::from(sse_data)) + .unwrap() +} + +/// Health check endpoint +async fn health_check() -> Response { + Response::builder() + .status(StatusCode::OK) + .header(header::CONTENT_TYPE, "application/json") + .body(Body::from(r#"{"status": "ok"}"#)) + .unwrap() +} + #[cfg(test)] mod tests { use super::*; From 8fb0c5261ac79690020ea0a6c16c51fc7981d182 Mon Sep 17 00:00:00 2001 From: linhdmn Date: Fri, 24 Apr 2026 15:13:54 +0700 Subject: [PATCH 2/4] feat: add database config structure for future PostgreSQL support - Add DatabaseConfig struct to project.rs with backend, path, pool_size, ssl_enabled - Add #[serde(default)] to ProjectConfig to handle missing database field - Add init_db_with_config function to schema.rs (SQLite-only, PG returns error) - Update HLD changelog to note PostgreSQL support pending cozo update Note: PostgreSQL support requires cozo to add PG storage backend. Currently only SQLite is supported via cozo 0.2. --- src/config/project.rs | 41 ++++++++++++++++++++++++++++++++++++ src/db/schema.rs | 49 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+) diff --git a/src/config/project.rs b/src/config/project.rs index 9b64328..a3adf69 100644 --- a/src/config/project.rs +++ b/src/config/project.rs @@ -2,11 +2,13 @@ use serde::{Deserialize, Serialize}; use std::path::PathBuf; #[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] pub struct ProjectConfig { pub project: ProjectSettings, pub indexer: IndexerConfig, pub mcp: McpConfig, pub documentation: DocConfig, + pub database: DatabaseConfig, pub microservice: Option, } @@ -61,6 +63,44 @@ pub struct McpConfig { pub auto_index_on_db_write: bool, } +/// Database configuration for LeanKG +/// SQLite (CozoDB embedded) is the default, PostgreSQL can be used for +/// multi-client HTTP server deployments +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DatabaseConfig { + /// Database backend: "sqlite" or "postgres" + #[serde(default = "default_backend")] + pub backend: String, + /// For SQLite: path to .leankg directory + /// For PostgreSQL: connection string (e.g., "postgres://user:pass@localhost:5432/leankg") + pub path: Option, + /// PostgreSQL connection pool size (only for postgres backend) + #[serde(default = "default_pool_size")] + pub pool_size: u32, + /// Enable SSL for PostgreSQL connections + #[serde(default)] + pub ssl_enabled: bool, +} + +fn default_backend() -> String { + "sqlite".to_string() +} + +fn default_pool_size() -> u32 { + 10 +} + +impl Default for DatabaseConfig { + fn default() -> Self { + Self { + backend: default_backend(), + path: None, + pool_size: default_pool_size(), + ssl_enabled: false, + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DocConfig { pub output: PathBuf, @@ -105,6 +145,7 @@ impl Default for ProjectConfig { output: PathBuf::from("./docs"), templates: vec!["agents".to_string(), "claude".to_string()], }, + database: DatabaseConfig::default(), microservice: None, } } diff --git a/src/db/schema.rs b/src/db/schema.rs index 92bf591..ed7795f 100644 --- a/src/db/schema.rs +++ b/src/db/schema.rs @@ -1,6 +1,8 @@ use cozo::{Db, SqliteStorage}; use std::path::Path; +use crate::config::project::DatabaseConfig; + pub type CozoDb = Db; pub fn init_db(db_path: &Path) -> Result> { @@ -34,6 +36,53 @@ pub fn init_db(db_path: &Path) -> Result> { Ok(db) } +/// Initialize database with configuration-based backend selection +/// Currently only supports SQLite. PostgreSQL support will be added when cozo adds it. +/// The config struct is prepared for future use. +pub fn init_db_with_config(config: &DatabaseConfig) -> Result> { + match config.backend.as_str() { + "postgres" | "postgresql" => { + Err("PostgreSQL support requires cozo version with PG storage. Currently only SQLite is supported.".into()) + } + _ => { + // Default to SQLite if path is provided, otherwise use default location + if let Some(path) = &config.path { + init_sqlite_at_path(path) + } else { + // Use default path in current directory + init_sqlite_default() + } + } + } +} + +fn init_sqlite_at_path(path_str: &str) -> Result> { + let db = cozo::new_cozo_sqlite(path_str.to_string())?; + + // Set memory limits for SQLite (CozoDB backend) + let pragmas = [ + "PRAGMA cache_size = -64000", + "PRAGMA mmap_size = 268435456", + "PRAGMA temp_store = MEMORY", + "PRAGMA synchronous = NORMAL", + "PRAGMA journal_mode = WAL", + "PRAGMA wal_autocheckpoint = 100", + ]; + for pragma in pragmas { + if let Err(e) = db.run_script(pragma, Default::default()) { + tracing::debug!("Pragma '{}' failed (may not be supported): {}", pragma, e); + } + } + + init_schema(&db)?; + + Ok(db) +} + +fn init_sqlite_default() -> Result> { + init_sqlite_at_path(".leankg/leankg.db") +} + fn init_schema(db: &CozoDb) -> Result<(), Box> { let check_relations = r#"::relations"#; let relations_result = db.run_script(check_relations, Default::default())?; From ff1be218d5a921cf66ea2f3c6c60d1dbebf7591b Mon Sep 17 00:00:00 2001 From: linhdmn Date: Fri, 24 Apr 2026 15:43:13 +0700 Subject: [PATCH 3/4] fix: clarify tool result handling and document unused PostgreSQL fields - Add comment explaining when tool results use as_str() vs wrap_response() - Mark pool_size and ssl_enabled as reserved for future PostgreSQL support --- src/config/project.rs | 2 ++ src/mcp/server.rs | 2 ++ 2 files changed, 4 insertions(+) diff --git a/src/config/project.rs b/src/config/project.rs index a3adf69..de06d17 100644 --- a/src/config/project.rs +++ b/src/config/project.rs @@ -75,9 +75,11 @@ pub struct DatabaseConfig { /// For PostgreSQL: connection string (e.g., "postgres://user:pass@localhost:5432/leankg") pub path: Option, /// PostgreSQL connection pool size (only for postgres backend) + /// Reserved for future PostgreSQL support - currently unused #[serde(default = "default_pool_size")] pub pool_size: u32, /// Enable SSL for PostgreSQL connections + /// Reserved for future PostgreSQL support - currently unused #[serde(default)] pub ssl_enabled: bool, } diff --git a/src/mcp/server.rs b/src/mcp/server.rs index 621c833..4d476f4 100644 --- a/src/mcp/server.rs +++ b/src/mcp/server.rs @@ -1022,6 +1022,8 @@ async fn process_jsonrpc_request( .map_err(|e| e.to_string())?; // Format as MCP tool result + // Tool results are either plain strings (as_str()) or structured JSON + // that needs to be wrapped in MCP response format let content_str = if let Some(s) = result.as_str() { s.to_string() } else { From 94c446731fccd0b830d7fc3ea184b3926cf03da4 Mon Sep 17 00:00:00 2001 From: linhdmn Date: Sat, 25 Apr 2026 00:59:48 +0700 Subject: [PATCH 4/4] fix: remove dead code and use constant-time token comparison - Remove unused SseEventType enum from HTTP transport - Use subtle::ConstantTimeEq for bearer token comparison to prevent timing attacks --- Cargo.lock | 1 + Cargo.toml | 1 + src/mcp/server.rs | 14 +++++--------- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d3f34fb..185d7f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1403,6 +1403,7 @@ dependencies = [ "serde_yaml", "sha2", "similar", + "subtle", "sysinfo", "tar", "tempfile", diff --git a/Cargo.toml b/Cargo.toml index 291272d..7aabeb5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ tracing-subscriber = "0.3" walkdir = "2" glob = "0.3" futures-util = "0.3" +subtle = "2" sha2 = "0.10" rayon = "1.10" pulldown-cmark = "0.12" diff --git a/src/mcp/server.rs b/src/mcp/server.rs index 4d476f4..39a8378 100644 --- a/src/mcp/server.rs +++ b/src/mcp/server.rs @@ -25,6 +25,7 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; +use subtle::ConstantTimeEq; use tokio::sync::RwLock as TokioRwLock; use tower_http::cors::{Any, CorsLayer}; @@ -869,14 +870,8 @@ mod json_rpc_code { pub const INTERNAL_ERROR: i32 = -32603; } -/// SSE event types for MCP protocol -#[derive(Debug, Clone, Copy)] -enum SseEventType { - Message, - Endpoint, -} - -/// Extract bearer token from Authorization header +/// Extract bearer token from Authorization header using constant-time comparison +/// to prevent timing attacks on bearer tokens. fn extract_bearer_token(auth_header: Option<&str>, token: &Option) -> bool { if token.is_none() { return true; // No auth required @@ -885,7 +880,8 @@ fn extract_bearer_token(auth_header: Option<&str>, token: &Option) -> bo if let Some(auth) = auth_header { if let Some(stripped) = auth.strip_prefix("Bearer ") { - return stripped == token.as_str(); + // Use constant-time comparison to prevent timing attacks + return stripped.as_bytes().ct_eq(token.as_bytes()).into(); } } false