From 346384846dc021acb0e85ee8b677166d27f0649f Mon Sep 17 00:00:00 2001 From: cptrodgers Date: Mon, 30 Mar 2026 21:19:51 +0700 Subject: [PATCH 1/2] Add logger, file log --- .gitignore | 1 + src/config.rs | 21 +++ src/display.rs | 7 +- src/logger/entry.rs | 113 ++++++++++++ src/logger/file_sink.rs | 385 ++++++++++++++++++++++++++++++++++++++++ src/logger/mod.rs | 221 +++++++++++++++++++++++ src/logger/sink.rs | 13 ++ src/logger/tui_sink.rs | 23 +++ src/main.rs | 54 ++++++ src/proxy.rs | 30 ++-- src/tui/state.rs | 91 +--------- src/widgets.rs | 53 +++--- 12 files changed, 870 insertions(+), 142 deletions(-) create mode 100644 src/logger/entry.rs create mode 100644 src/logger/file_sink.rs create mode 100644 src/logger/mod.rs create mode 100644 src/logger/sink.rs create mode 100644 src/logger/tui_sink.rs diff --git a/.gitignore b/.gitignore index da83350..edb1081 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ dist/ node_modules mcpr.toml +logs diff --git a/src/config.rs b/src/config.rs index 3b88a79..0b12d36 100644 --- a/src/config.rs +++ b/src/config.rs @@ -125,6 +125,18 @@ struct FileTunnelConfig { subdomain: Option, } +/// `[logging]` table in config file +#[derive(serde::Deserialize, Default)] +#[serde(default)] +struct FileLoggingConfig { + /// Enable JSONL file logging. + file: bool, + /// Directory for log files (default: "./logs"). + dir: Option, + /// Rotation strategy: "daily" or "size:50MB" (default: "daily"). + rotation: Option, +} + /// Config file format (mcpr.toml) #[derive(serde::Deserialize, Default)] #[serde(default)] @@ -145,6 +157,9 @@ struct FileConfig { // -- Tunnel client -- tunnel: FileTunnelConfig, + // -- Logging -- + logging: FileLoggingConfig, + max_request_body_size: Option, max_response_body_size: Option, max_concurrent_upstream: Option, @@ -239,6 +254,9 @@ pub struct GatewayConfig { pub max_concurrent_upstream: Option, pub connect_timeout: Option, pub request_timeout: Option, + pub log_file: bool, + pub log_dir: Option, + pub log_rotation: Option, } impl GatewayConfig { @@ -559,6 +577,9 @@ fn load_gateway(cli: Cli, file: FileConfig, config_path: Option, + pub session_id: Option, + pub status: u16, + pub note: String, + pub upstream_url: Option, + pub resp_size: Option, + pub duration_ms: Option, + /// Time spent waiting for upstream (network). Proxy overhead = duration_ms - upstream_ms. + pub upstream_ms: Option, + /// JSON-RPC error code from the response body (if the response is a JSON-RPC error). + pub jsonrpc_error: Option<(i64, String)>, + /// Extra detail: tool name for tools/call, resource URI for resources/read, etc. + pub detail: Option, +} + +impl LogEntry { + pub fn new(method: &str, path: &str, status: u16, note: &str) -> Self { + let now = chrono::Utc::now(); + Self { + timestamp: now + .with_timezone(&chrono::Local) + .format("%H:%M:%S") + .to_string(), + timestamp_utc: now.to_rfc3339_opts(chrono::SecondsFormat::Millis, true), + method: method.to_string(), + path: path.to_string(), + mcp_method: None, + session_id: None, + status, + note: note.to_string(), + upstream_url: None, + resp_size: None, + duration_ms: None, + upstream_ms: None, + jsonrpc_error: None, + detail: None, + } + } + + pub fn session_id(mut self, id: &str) -> Self { + self.session_id = Some(id.to_string()); + self + } + + pub fn maybe_session_id(mut self, id: Option<&str>) -> Self { + self.session_id = id.map(String::from); + self + } + + pub fn mcp_method(mut self, m: &str) -> Self { + self.mcp_method = Some(m.to_string()); + self + } + + pub fn upstream(mut self, url: &str) -> Self { + self.upstream_url = Some(url.to_string()); + self + } + + pub fn size(mut self, bytes: usize) -> Self { + self.resp_size = Some(bytes); + self + } + + pub fn duration(mut self, start: Instant) -> Self { + self.duration_ms = Some(start.elapsed().as_millis() as u64); + self + } + + pub fn upstream_duration(mut self, ms: u64) -> Self { + self.upstream_ms = Some(ms); + self + } + + pub fn jsonrpc_error(mut self, code: i64, message: &str) -> Self { + self.jsonrpc_error = Some((code, message.to_string())); + self + } + + pub fn detail(mut self, d: &str) -> Self { + self.detail = Some(d.to_string()); + self + } + + pub fn maybe_detail(mut self, d: Option<&str>) -> Self { + self.detail = d.map(String::from); + self + } +} diff --git a/src/logger/file_sink.rs b/src/logger/file_sink.rs new file mode 100644 index 0000000..ea2c36b --- /dev/null +++ b/src/logger/file_sink.rs @@ -0,0 +1,385 @@ +use std::fs::{self, File, OpenOptions}; +use std::io::{BufWriter, Write}; +use std::path::{Path, PathBuf}; +use std::sync::Mutex; + +use super::entry::LogEntry; +use super::sink::LogSink; + +const DEFAULT_MAX_FILE_SIZE: u64 = 50 * 1024 * 1024; // 50 MB +pub const DEFAULT_MAX_FILES: usize = 10; +const BUFFER_CAPACITY: usize = 32 * 1024; // 32 KB + +/// Rotation strategy for log files. +#[derive(Clone, Debug)] +pub enum Rotation { + /// Rotate when the file exceeds a size threshold (in bytes). + Size(u64), + /// Rotate daily (new file per calendar day). + Daily, +} + +impl Default for Rotation { + fn default() -> Self { + Rotation::Size(DEFAULT_MAX_FILE_SIZE) + } +} + +/// Configuration for the file sink. +#[derive(Clone, Debug)] +pub struct FileSinkConfig { + pub dir: PathBuf, + pub rotation: Rotation, + /// Maximum number of log files to keep. Oldest are deleted when exceeded. + /// Defaults to 10. This caps total disk usage to roughly `max_files * rotation_size`. + pub max_files: usize, +} + +/// JSONL file sink with buffered writes and rotation. +/// +/// Writes one JSON object per line to `{dir}/mcpr-{date}.log`. +/// Rotation happens by size or daily, depending on config. +pub struct FileSink { + config: FileSinkConfig, + inner: Mutex, +} + +struct FileSinkInner { + writer: BufWriter, + current_path: PathBuf, + current_date: String, + bytes_written: u64, +} + +impl FileSink { + pub fn new(config: FileSinkConfig) -> std::io::Result { + fs::create_dir_all(&config.dir)?; + let (path, date) = Self::log_path(&config.dir); + let file = Self::open_append(&path)?; + let bytes_written = file.metadata().map(|m| m.len()).unwrap_or(0); + Ok(Self { + config, + inner: Mutex::new(FileSinkInner { + writer: BufWriter::with_capacity(BUFFER_CAPACITY, file), + current_path: path, + current_date: date, + bytes_written, + }), + }) + } + + fn log_path(dir: &Path) -> (PathBuf, String) { + let date = chrono::Utc::now().format("%Y-%m-%d").to_string(); + let path = dir.join(format!("mcpr-{date}.log")); + (path, date) + } + + fn open_append(path: &Path) -> std::io::Result { + OpenOptions::new().create(true).append(true).open(path) + } + + fn should_rotate(&self, inner: &FileSinkInner) -> bool { + match &self.config.rotation { + Rotation::Size(max) => inner.bytes_written >= *max, + Rotation::Daily => { + let today = chrono::Utc::now().format("%Y-%m-%d").to_string(); + inner.current_date != today + } + } + } + + fn rotate(&self, inner: &mut FileSinkInner) -> std::io::Result<()> { + // Flush the current writer before rotating + inner.writer.flush()?; + + match &self.config.rotation { + Rotation::Size(_) => { + // Rename current file with a sequence number + let mut seq = 1u32; + loop { + let rotated = inner.current_path.with_extension(format!("{seq}.log")); + if !rotated.exists() { + fs::rename(&inner.current_path, &rotated)?; + break; + } + seq += 1; + } + // Re-open same path (now empty) + let file = Self::open_append(&inner.current_path)?; + inner.writer = BufWriter::with_capacity(BUFFER_CAPACITY, file); + inner.bytes_written = 0; + } + Rotation::Daily => { + // Open new file with today's date + let (path, date) = Self::log_path(&self.config.dir); + let existing_size = path.metadata().map(|m| m.len()).unwrap_or(0); + let file = Self::open_append(&path)?; + inner.current_path = path; + inner.current_date = date; + inner.writer = BufWriter::with_capacity(BUFFER_CAPACITY, file); + inner.bytes_written = existing_size; + } + } + Ok(()) + } + + /// Delete oldest log files if the total count exceeds `max_files`. + fn cleanup_old_files(&self) { + let max = self.config.max_files; + if max == 0 { + return; + } + + let Ok(entries) = fs::read_dir(&self.config.dir) else { + return; + }; + + let mut log_files: Vec = entries + .filter_map(|e| e.ok()) + .map(|e| e.path()) + .filter(|p| { + p.file_name() + .and_then(|n| n.to_str()) + .is_some_and(|n| n.starts_with("mcpr-") && n.ends_with(".log")) + }) + .collect(); + + if log_files.len() <= max { + return; + } + + // Sort by modification time (oldest first) + log_files.sort_by_key(|p| p.metadata().and_then(|m| m.modified()).ok()); + + let to_remove = log_files.len() - max; + for path in log_files.into_iter().take(to_remove) { + let _ = fs::remove_file(path); + } + } +} + +impl LogSink for FileSink { + fn emit(&self, entry: &LogEntry) { + let mut inner = self.inner.lock().unwrap(); + + // Check rotation before writing + if self.should_rotate(&inner) { + match self.rotate(&mut inner) { + Ok(()) => self.cleanup_old_files(), + Err(e) => { + eprintln!("mcpr: log rotation failed: {e}"); + return; + } + } + } + + // Serialize and write + match serde_json::to_string(entry) { + Ok(line) => { + let bytes = line.len() as u64 + 1; // +1 for newline + if let Err(e) = writeln!(inner.writer, "{line}") { + eprintln!("mcpr: log write failed: {e}"); + } else { + inner.bytes_written += bytes; + } + } + Err(e) => { + eprintln!("mcpr: log serialize failed: {e}"); + } + } + } + + fn flush(&self) { + if let Ok(mut inner) = self.inner.lock() { + let _ = inner.writer.flush(); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + fn make_entry(method: &str, path: &str, status: u16) -> LogEntry { + LogEntry::new(method, path, status, "test") + } + + #[test] + fn writes_jsonl_to_file() { + let dir = TempDir::new().unwrap(); + let config = FileSinkConfig { + dir: dir.path().to_path_buf(), + rotation: Rotation::Size(DEFAULT_MAX_FILE_SIZE), + max_files: DEFAULT_MAX_FILES, + }; + let sink = FileSink::new(config).unwrap(); + + sink.emit(&make_entry("POST", "/mcp", 200)); + sink.emit(&make_entry("GET", "/health", 200)); + sink.flush(); + + // Read the log file + let entries: Vec = fs::read_dir(dir.path()) + .unwrap() + .filter_map(|e| e.ok()) + .filter(|e| e.path().extension().is_some_and(|ext| ext == "log")) + .flat_map(|e| { + fs::read_to_string(e.path()) + .unwrap() + .lines() + .map(String::from) + .collect::>() + }) + .collect(); + + assert_eq!(entries.len(), 2); + + // Verify each line is valid JSON with expected fields + let first: serde_json::Value = serde_json::from_str(&entries[0]).unwrap(); + assert_eq!(first["method"], "POST"); + assert_eq!(first["path"], "/mcp"); + assert_eq!(first["status"], 200); + assert!(first["timestamp_utc"].as_str().is_some()); + + let second: serde_json::Value = serde_json::from_str(&entries[1]).unwrap(); + assert_eq!(second["method"], "GET"); + assert_eq!(second["path"], "/health"); + } + + #[test] + fn rotates_by_size() { + let dir = TempDir::new().unwrap(); + let config = FileSinkConfig { + dir: dir.path().to_path_buf(), + rotation: Rotation::Size(100), // Very small threshold + max_files: DEFAULT_MAX_FILES, + }; + let sink = FileSink::new(config).unwrap(); + + // Write enough entries to trigger rotation + for i in 0..10 { + sink.emit(&make_entry("POST", &format!("/path/{i}"), 200)); + } + sink.flush(); + + // Should have multiple log files after rotation + let log_files: Vec<_> = fs::read_dir(dir.path()) + .unwrap() + .filter_map(|e| e.ok()) + .filter(|e| { + e.path() + .file_name() + .unwrap() + .to_str() + .unwrap() + .starts_with("mcpr-") + }) + .collect(); + + assert!( + log_files.len() > 1, + "expected multiple log files after rotation, got {}", + log_files.len() + ); + } + + #[test] + fn serializes_all_fields() { + let dir = TempDir::new().unwrap(); + let config = FileSinkConfig { + dir: dir.path().to_path_buf(), + rotation: Rotation::Size(DEFAULT_MAX_FILE_SIZE), + max_files: DEFAULT_MAX_FILES, + }; + let sink = FileSink::new(config).unwrap(); + + let entry = LogEntry::new("POST", "/mcp", 200, "rewritten") + .mcp_method("tools/call") + .detail("get_weather") + .session_id("sid-123") + .upstream("http://localhost:9000/mcp") + .size(147) + .upstream_duration(7) + .jsonrpc_error(-32602, "Invalid params"); + + sink.emit(&entry); + sink.flush(); + + let log_file = fs::read_dir(dir.path()) + .unwrap() + .filter_map(|e| e.ok()) + .find(|e| e.path().extension().is_some_and(|ext| ext == "log")) + .unwrap(); + + let content = fs::read_to_string(log_file.path()).unwrap(); + let val: serde_json::Value = serde_json::from_str(content.trim()).unwrap(); + + assert_eq!(val["method"], "POST"); + assert_eq!(val["path"], "/mcp"); + assert_eq!(val["status"], 200); + assert_eq!(val["note"], "rewritten"); + assert_eq!(val["mcp_method"], "tools/call"); + assert_eq!(val["detail"], "get_weather"); + assert_eq!(val["session_id"], "sid-123"); + assert_eq!(val["upstream_url"], "http://localhost:9000/mcp"); + assert_eq!(val["resp_size"], 147); + assert_eq!(val["upstream_ms"], 7); + assert_eq!(val["jsonrpc_error"][0], -32602); + assert_eq!(val["jsonrpc_error"][1], "Invalid params"); + assert!(val["timestamp_utc"].as_str().unwrap().contains("T")); + } + + #[test] + fn creates_dir_if_missing() { + let dir = TempDir::new().unwrap(); + let nested = dir.path().join("logs").join("nested"); + let config = FileSinkConfig { + dir: nested.clone(), + rotation: Rotation::Size(DEFAULT_MAX_FILE_SIZE), + max_files: DEFAULT_MAX_FILES, + }; + + let sink = FileSink::new(config).unwrap(); + sink.emit(&make_entry("GET", "/test", 200)); + sink.flush(); + + assert!(nested.exists()); + } + + #[test] + fn cleans_up_old_files() { + let dir = TempDir::new().unwrap(); + let config = FileSinkConfig { + dir: dir.path().to_path_buf(), + rotation: Rotation::Size(100), // Very small to trigger many rotations + max_files: 3, + }; + let sink = FileSink::new(config).unwrap(); + + // Write enough entries to trigger many rotations + for i in 0..50 { + sink.emit(&make_entry("POST", &format!("/path/{i}"), 200)); + } + sink.flush(); + + let log_files: Vec<_> = fs::read_dir(dir.path()) + .unwrap() + .filter_map(|e| e.ok()) + .filter(|e| { + e.path() + .file_name() + .unwrap() + .to_str() + .unwrap() + .starts_with("mcpr-") + }) + .collect(); + + assert!( + log_files.len() <= 3, + "expected at most 3 log files, got {}", + log_files.len() + ); + } +} diff --git a/src/logger/mod.rs b/src/logger/mod.rs new file mode 100644 index 0000000..28708c7 --- /dev/null +++ b/src/logger/mod.rs @@ -0,0 +1,221 @@ +mod entry; +pub mod file_sink; +mod sink; +mod tui_sink; + +pub use entry::LogEntry; +pub use file_sink::{DEFAULT_MAX_FILES, FileSink, FileSinkConfig, Rotation}; +pub use sink::LogSink; +pub use tui_sink::TuiSink; + +use tokio::sync::mpsc; + +const CHANNEL_CAPACITY: usize = 4096; + +/// Routes log entries to multiple sinks via a bounded async channel. +/// +/// The proxy hot path calls `emit()` which does a non-blocking channel send. +/// A background tokio task reads from the channel and fans out to all sinks. +#[derive(Clone)] +pub struct LogRouter { + tx: mpsc::Sender, +} + +/// Handle returned by `LogRouter::new` to manage the background task. +pub struct LogRouterHandle { + pub router: LogRouter, + task: tokio::task::JoinHandle<()>, + shutdown: mpsc::Sender<()>, +} + +impl LogRouterHandle { + /// Graceful shutdown: signals the background task and waits for it to drain. + pub async fn shutdown(self) { + let _ = self.shutdown.send(()).await; + let _ = self.task.await; + } +} + +impl LogRouter { + /// Create a new router with the given sinks. + /// + /// Spawns a background tokio task that reads entries and fans out to sinks. + /// Returns a handle that owns both the router (for cloning into AppState) + /// and the background task (for graceful shutdown). + pub fn start(sinks: Vec>) -> LogRouterHandle { + let (tx, rx) = mpsc::channel(CHANNEL_CAPACITY); + let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1); + + let task = tokio::spawn(router_task(rx, shutdown_rx, sinks)); + + LogRouterHandle { + router: LogRouter { tx }, + task, + shutdown: shutdown_tx, + } + } + + /// Send a log entry to all sinks. Non-blocking — drops the entry if the + /// channel is full (never blocks the proxy request path). + pub fn emit(&self, entry: LogEntry) { + let _ = self.tx.try_send(entry); + } +} + +async fn router_task( + mut rx: mpsc::Receiver, + mut shutdown_rx: mpsc::Receiver<()>, + sinks: Vec>, +) { + loop { + tokio::select! { + Some(entry) = rx.recv() => { + for sink in &sinks { + sink.emit(&entry); + } + } + _ = shutdown_rx.recv() => { + // Drain remaining entries + while let Ok(entry) = rx.try_recv() { + for sink in &sinks { + sink.emit(&entry); + } + } + // Flush all sinks + for sink in &sinks { + sink.flush(); + } + return; + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::{Arc, Mutex}; + + /// Test sink that records entries in memory. + struct MemorySink { + entries: Arc>>, + flush_count: Arc>, + } + + impl MemorySink { + fn new() -> (Self, Arc>>, Arc>) { + let entries = Arc::new(Mutex::new(Vec::new())); + let flush_count = Arc::new(Mutex::new(0u32)); + ( + Self { + entries: entries.clone(), + flush_count: flush_count.clone(), + }, + entries, + flush_count, + ) + } + } + + impl LogSink for MemorySink { + fn emit(&self, entry: &LogEntry) { + self.entries.lock().unwrap().push(entry.clone()); + } + + fn flush(&self) { + *self.flush_count.lock().unwrap() += 1; + } + } + + #[tokio::test] + async fn routes_to_single_sink() { + let (sink, entries, _) = MemorySink::new(); + let handle = LogRouter::start(vec![Box::new(sink)]); + + handle + .router + .emit(LogEntry::new("POST", "/mcp", 200, "test")); + handle + .router + .emit(LogEntry::new("GET", "/health", 200, "test")); + + handle.shutdown().await; + + let entries = entries.lock().unwrap(); + assert_eq!(entries.len(), 2); + assert_eq!(entries[0].method, "POST"); + assert_eq!(entries[1].method, "GET"); + } + + #[tokio::test] + async fn routes_to_multiple_sinks() { + let (sink1, entries1, _) = MemorySink::new(); + let (sink2, entries2, _) = MemorySink::new(); + let handle = LogRouter::start(vec![Box::new(sink1), Box::new(sink2)]); + + handle + .router + .emit(LogEntry::new("POST", "/mcp", 200, "test")); + + handle.shutdown().await; + + assert_eq!(entries1.lock().unwrap().len(), 1); + assert_eq!(entries2.lock().unwrap().len(), 1); + } + + #[tokio::test] + async fn flushes_on_shutdown() { + let (sink, _, flush_count) = MemorySink::new(); + let handle = LogRouter::start(vec![Box::new(sink)]); + + handle + .router + .emit(LogEntry::new("POST", "/mcp", 200, "test")); + handle.shutdown().await; + + assert_eq!(*flush_count.lock().unwrap(), 1); + } + + #[tokio::test] + async fn drains_channel_on_shutdown() { + let (sink, entries, _) = MemorySink::new(); + let handle = LogRouter::start(vec![Box::new(sink)]); + + // Emit several entries quickly + for i in 0..100 { + handle + .router + .emit(LogEntry::new("POST", &format!("/path/{i}"), 200, "test")); + } + + handle.shutdown().await; + + // All entries should have been drained + assert_eq!(entries.lock().unwrap().len(), 100); + } + + #[tokio::test] + async fn clone_router_shares_channel() { + let (sink, entries, _) = MemorySink::new(); + let handle = LogRouter::start(vec![Box::new(sink)]); + + let router_clone = handle.router.clone(); + handle.router.emit(LogEntry::new("POST", "/a", 200, "test")); + router_clone.emit(LogEntry::new("GET", "/b", 200, "test")); + + handle.shutdown().await; + + let entries = entries.lock().unwrap(); + assert_eq!(entries.len(), 2); + } + + #[tokio::test] + async fn works_with_no_sinks() { + let handle = LogRouter::start(vec![]); + handle + .router + .emit(LogEntry::new("POST", "/mcp", 200, "test")); + handle.shutdown().await; + // Should not panic + } +} diff --git a/src/logger/sink.rs b/src/logger/sink.rs new file mode 100644 index 0000000..faee87e --- /dev/null +++ b/src/logger/sink.rs @@ -0,0 +1,13 @@ +use super::entry::LogEntry; + +/// Trait for log output backends. +/// +/// Implement this to add new log destinations (file, database, metrics, etc.). +/// Sinks receive cloned `LogEntry` values from the `LogRouter` background task. +pub trait LogSink: Send + Sync + 'static { + /// Process a single log entry. Called from the router's background task. + fn emit(&self, entry: &LogEntry); + + /// Flush any buffered data. Called during graceful shutdown. + fn flush(&self) {} +} diff --git a/src/logger/tui_sink.rs b/src/logger/tui_sink.rs new file mode 100644 index 0000000..cd9eb12 --- /dev/null +++ b/src/logger/tui_sink.rs @@ -0,0 +1,23 @@ +use crate::tui::SharedTuiState; + +use super::entry::LogEntry; +use super::sink::LogSink; + +/// Sink that pushes log entries to the TUI dashboard. +/// +/// Wraps the existing `SharedTuiState` so the TUI continues to work unchanged. +pub struct TuiSink { + state: SharedTuiState, +} + +impl TuiSink { + pub fn new(state: SharedTuiState) -> Self { + Self { state } + } +} + +impl LogSink for TuiSink { + fn emit(&self, entry: &LogEntry) { + self.state.lock().unwrap().push_log(entry.clone()); + } +} diff --git a/src/main.rs b/src/main.rs index fa4afee..a819f39 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ mod config; mod display; mod jsonrpc; +pub mod logger; mod onboarding; mod proxy; mod relay; @@ -20,6 +21,7 @@ use tower_http::cors::{Any, CorsLayer}; use config::{GatewayConfig, Mode}; use display::log_startup; +use logger::{DEFAULT_MAX_FILES, FileSink, FileSinkConfig, LogRouter, LogSink, Rotation, TuiSink}; use proxy::proxy_routes; use rewrite::RewriteConfig; use session::MemorySessionStore; @@ -55,6 +57,7 @@ pub struct AppState { pub rewrite_config: Arc>, pub http_client: reqwest::Client, pub tui_state: SharedTuiState, + pub logger: LogRouter, pub sessions: MemorySessionStore, pub max_request_body: usize, pub max_response_body: usize, @@ -200,6 +203,38 @@ async fn run_gateway(cfg: GatewayConfig) { let request_timeout = Duration::from_secs(cfg.request_timeout.unwrap_or(DEFAULT_REQUEST_TIMEOUT_SECS)); + // Build log sinks + let mut sinks: Vec> = vec![Box::new(TuiSink::new(tui_state.clone()))]; + + if cfg.log_file { + let rotation = match cfg.log_rotation.as_deref() { + Some(s) if s.starts_with("size:") => { + let size_str = s.trim_start_matches("size:"); + let bytes = parse_size(size_str).unwrap_or(50 * 1024 * 1024); + Rotation::Size(bytes) + } + _ => Rotation::Daily, + }; + let dir = cfg.log_dir.unwrap_or_else(|| "./logs".to_string()); + match FileSink::new(FileSinkConfig { + dir: std::path::PathBuf::from(&dir), + rotation, + max_files: DEFAULT_MAX_FILES, + }) { + Ok(sink) => { + sinks.push(Box::new(sink)); + } + Err(e) => { + eprintln!( + "{}: failed to init file logger: {e}", + colored::Colorize::red("error"), + ); + } + } + } + + let log_handle = LogRouter::start(sinks); + let state = AppState { mcp_upstream: mcp.clone(), widget_source, @@ -211,6 +246,7 @@ async fn run_gateway(cfg: GatewayConfig) { .build() .expect("Failed to build HTTP client"), tui_state: tui_state.clone(), + logger: log_handle.router.clone(), sessions: MemorySessionStore::new(), max_request_body: cfg .max_request_body_size @@ -259,6 +295,24 @@ async fn run_gateway(cfg: GatewayConfig) { }); tui_handle.await.unwrap(); + + // Gracefully flush log sinks + log_handle.shutdown().await; +} + +/// Parse a human-readable size string like "50MB", "100KB", "1GB" into bytes. +fn parse_size(s: &str) -> Option { + let s = s.trim(); + let (num_str, multiplier) = if let Some(n) = s.strip_suffix("GB") { + (n, 1024 * 1024 * 1024) + } else if let Some(n) = s.strip_suffix("MB") { + (n, 1024 * 1024) + } else if let Some(n) = s.strip_suffix("KB") { + (n, 1024) + } else { + (s, 1) + }; + num_str.trim().parse::().ok().map(|n| n * multiplier) } /// Validate MCP URL format at startup. Exits with an error for clearly invalid URLs, diff --git a/src/proxy.rs b/src/proxy.rs index ab5dcb9..c68f315 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -12,11 +12,10 @@ use futures_util::StreamExt; use serde_json::Value; use crate::AppState; -use crate::display::log_request; use crate::jsonrpc::{self, McpMethod}; +use crate::logger::LogEntry; use crate::rewrite::rewrite_response; use crate::session::{self, SessionState, SessionStore}; -use crate::tui::state::LogEntry; use crate::widgets::{ fetch_widget_html, list_widgets, serve_studio, serve_widget_asset, serve_widget_html, }; @@ -232,8 +231,7 @@ async fn handle_request( let upstream_ms = upstream_start.elapsed().as_millis() as u64; let status = resp.status().as_u16(); let resp_headers = resp.headers().clone(); - log_request( - &state.tui_state, + state.logger.emit( LogEntry::new("GET", path, status, "sse") .upstream(&upstream_url) .upstream_duration(upstream_ms) @@ -247,8 +245,7 @@ async fn handle_request( } Err(e) => { let upstream_ms = upstream_start.elapsed().as_millis() as u64; - log_request( - &state.tui_state, + state.logger.emit( LogEntry::new("GET", path, 502, "upstream error") .upstream(&upstream_url) .upstream_duration(upstream_ms) @@ -317,8 +314,7 @@ async fn handle_mcp_post( Ok(r) => r, Err(e) => { let upstream_ms = upstream_start.elapsed().as_millis() as u64; - log_request( - &state.tui_state, + state.logger.emit( LogEntry::new("POST", path, 502, "upstream error") .mcp_method(method_str) .maybe_detail(call_detail.as_deref()) @@ -393,11 +389,10 @@ async fn handle_mcp_post( if let Some((code, ref msg)) = rpc_error { entry = entry.jsonrpc_error(code, msg); } - log_request(&state.tui_state, entry); + state.logger.emit(entry); build_response(status, &resp_headers, Body::from(body)) } else { - log_request( - &state.tui_state, + state.logger.emit( LogEntry::new("POST", path, status, "passthrough") .mcp_method(method_str) .maybe_detail(call_detail.as_deref()) @@ -463,8 +458,7 @@ async fn handle_resources_read( drop(config); let body = serde_json::to_vec(&json_body).unwrap_or_default(); - log_request( - &state.tui_state, + state.logger.emit( LogEntry::new("POST", "/*", 200, "intercepted") .mcp_method(jsonrpc::RESOURCES_READ) .size(body.len()) @@ -516,8 +510,7 @@ async fn forward_and_passthrough( ); drop(config); let rewritten_bytes = rewritten.into_bytes(); - log_request( - &state.tui_state, + state.logger.emit( LogEntry::new(method.as_str(), log_path, status, "rewritten") .upstream(url) .size(rewritten_bytes.len()) @@ -526,8 +519,7 @@ async fn forward_and_passthrough( ); build_response(status, &resp_headers, Body::from(rewritten_bytes)) } else { - log_request( - &state.tui_state, + state.logger.emit( LogEntry::new(method.as_str(), log_path, status, "passthrough") .upstream(url) .size(bytes.len()) @@ -539,8 +531,7 @@ async fn forward_and_passthrough( } Err(e) => { let upstream_ms = upstream_start.elapsed().as_millis() as u64; - log_request( - &state.tui_state, + state.logger.emit( LogEntry::new(method.as_str(), log_path, 502, "upstream error") .upstream(url) .upstream_duration(upstream_ms) @@ -901,6 +892,7 @@ mod tests { .build() .unwrap(), tui_state: crate::tui::new_shared_state(), + logger: crate::logger::LogRouter::start(vec![]).router, sessions: crate::session::MemorySessionStore::new(), max_request_body: max_request, max_response_body: max_response, diff --git a/src/tui/state.rs b/src/tui/state.rs index aa6f9fc..ca35cd6 100644 --- a/src/tui/state.rs +++ b/src/tui/state.rs @@ -2,6 +2,8 @@ use std::collections::VecDeque; use std::sync::{Arc, Mutex}; use std::time::Instant; +pub use crate::logger::LogEntry; + const MAX_LOG_ENTRIES: usize = 10_000; #[derive(Clone, Copy, PartialEq)] @@ -34,95 +36,6 @@ impl ConnectionStatus { } } -pub struct LogEntry { - pub timestamp: String, - pub method: String, - pub path: String, - pub mcp_method: Option, - pub session_id: Option, - pub status: u16, - pub note: String, - pub upstream_url: Option, - pub resp_size: Option, - pub duration_ms: Option, - /// Time spent waiting for upstream (network). Proxy overhead = duration_ms - upstream_ms. - pub upstream_ms: Option, - /// JSON-RPC error code from the response body (if the response is a JSON-RPC error). - pub jsonrpc_error: Option<(i64, String)>, - /// Extra detail: tool name for tools/call, resource URI for resources/read, etc. - pub detail: Option, -} - -impl LogEntry { - pub fn new(method: &str, path: &str, status: u16, note: &str) -> Self { - Self { - timestamp: chrono::Local::now().format("%H:%M:%S").to_string(), - method: method.to_string(), - path: path.to_string(), - mcp_method: None, - session_id: None, - status, - note: note.to_string(), - upstream_url: None, - resp_size: None, - duration_ms: None, - upstream_ms: None, - jsonrpc_error: None, - detail: None, - } - } - - pub fn session_id(mut self, id: &str) -> Self { - self.session_id = Some(id.to_string()); - self - } - - pub fn maybe_session_id(mut self, id: Option<&str>) -> Self { - self.session_id = id.map(String::from); - self - } - - pub fn mcp_method(mut self, m: &str) -> Self { - self.mcp_method = Some(m.to_string()); - self - } - - pub fn upstream(mut self, url: &str) -> Self { - self.upstream_url = Some(url.to_string()); - self - } - - pub fn size(mut self, bytes: usize) -> Self { - self.resp_size = Some(bytes); - self - } - - pub fn duration(mut self, start: Instant) -> Self { - self.duration_ms = Some(start.elapsed().as_millis() as u64); - self - } - - pub fn upstream_duration(mut self, ms: u64) -> Self { - self.upstream_ms = Some(ms); - self - } - - pub fn jsonrpc_error(mut self, code: i64, message: &str) -> Self { - self.jsonrpc_error = Some((code, message.to_string())); - self - } - - pub fn detail(mut self, d: &str) -> Self { - self.detail = Some(d.to_string()); - self - } - - pub fn maybe_detail(mut self, d: Option<&str>) -> Self { - self.detail = d.map(String::from); - self - } -} - pub struct TuiState { // Info panel pub proxy_url: String, diff --git a/src/widgets.rs b/src/widgets.rs index bc23f6f..4b60fee 100644 --- a/src/widgets.rs +++ b/src/widgets.rs @@ -6,8 +6,7 @@ use std::path::{Path, PathBuf}; use std::time::Duration; use crate::AppState; -use crate::display::log_request; -use crate::tui::state::LogEntry; +use crate::logger::LogEntry; // ── Types ─────────────────────────────────────────────── @@ -43,8 +42,7 @@ pub async fn serve_widget_asset(state: &AppState, path: &str) -> Response { let status_code = StatusCode::from_u16(status).unwrap_or(StatusCode::BAD_GATEWAY); let bytes = resp.bytes().await.unwrap_or_default(); - log_request( - &state.tui_state, + state.logger.emit( LogEntry::new("GET", path, status, "widget") .upstream(&url) .size(bytes.len()), @@ -52,10 +50,9 @@ pub async fn serve_widget_asset(state: &AppState, path: &str) -> Response { (status_code, headers, bytes).into_response() } Err(e) => { - log_request( - &state.tui_state, - LogEntry::new("GET", path, 502, "widget error").upstream(&url), - ); + state + .logger + .emit(LogEntry::new("GET", path, 502, "widget error").upstream(&url)); (StatusCode::BAD_GATEWAY, format!("Widget proxy error: {e}")).into_response() } } @@ -64,8 +61,7 @@ pub async fn serve_widget_asset(state: &AppState, path: &str) -> Response { let file_path = PathBuf::from(dir).join(path.trim_start_matches('/')); match tokio::fs::read(&file_path).await { Ok(bytes) => { - log_request( - &state.tui_state, + state.logger.emit( LogEntry::new("GET", path, 200, "widget") .upstream(file_path.to_str().unwrap_or(path)) .size(bytes.len()), @@ -77,19 +73,17 @@ pub async fn serve_widget_asset(state: &AppState, path: &str) -> Response { (StatusCode::OK, headers, bytes).into_response() } Err(_) => { - log_request( - &state.tui_state, - LogEntry::new("GET", path, 404, "not found"), - ); + state + .logger + .emit(LogEntry::new("GET", path, 404, "not found")); StatusCode::NOT_FOUND.into_response() } } } None => { - log_request( - &state.tui_state, - LogEntry::new("GET", path, 404, "no widget source"), - ); + state + .logger + .emit(LogEntry::new("GET", path, 404, "no widget source")); StatusCode::NOT_FOUND.into_response() } } @@ -141,15 +135,16 @@ pub async fn serve_widget_html(state: &AppState, name: &str, raw: bool) -> Respo } let Some(html) = fetch_widget_html(state, name).await else { - log_request( - &state.tui_state, - LogEntry::new("GET", &format!("/widgets/{name}.html"), 404, "not found"), - ); + state.logger.emit(LogEntry::new( + "GET", + &format!("/widgets/{name}.html"), + 404, + "not found", + )); return (StatusCode::NOT_FOUND, format!("Widget '{name}' not found")).into_response(); }; - log_request( - &state.tui_state, + state.logger.emit( LogEntry::new("GET", &format!("/widgets/{name}.html"), 200, "widget raw").size(html.len()), ); let mut headers = HeaderMap::new(); @@ -174,10 +169,12 @@ pub async fn list_widgets(state: &AppState) -> Response { }) }).collect::>(), }); - log_request( - &state.tui_state, - LogEntry::new("GET", "/widgets", 200, &format!("{} widgets", names.len())), - ); + state.logger.emit(LogEntry::new( + "GET", + "/widgets", + 200, + &format!("{} widgets", names.len()), + )); let mut headers = HeaderMap::new(); headers.insert(header::CONTENT_TYPE, "application/json".parse().unwrap()); ( From aa6f19f6d57e0d85e9eb679ee61428819fa34ba0 Mon Sep 17 00:00:00 2001 From: cptrodgers Date: Mon, 30 Mar 2026 21:33:21 +0700 Subject: [PATCH 2/2] Use ring buffer and flush --- src/logger/file_sink.rs | 161 +++++++++++++++++++++++++++++++--------- src/logger/mod.rs | 47 ++++++++++-- src/logger/sink.rs | 14 +++- src/main.rs | 6 +- 4 files changed, 184 insertions(+), 44 deletions(-) diff --git a/src/logger/file_sink.rs b/src/logger/file_sink.rs index ea2c36b..e1cfbd5 100644 --- a/src/logger/file_sink.rs +++ b/src/logger/file_sink.rs @@ -33,12 +33,43 @@ pub struct FileSinkConfig { /// Maximum number of log files to keep. Oldest are deleted when exceeded. /// Defaults to 10. This caps total disk usage to roughly `max_files * rotation_size`. pub max_files: usize, + /// Filename prefix, typically derived from the MCP upstream identity. + /// e.g. "mcpr-localhost-9000" → `mcpr-localhost-9000-2026-03-30.log` + /// Defaults to "mcpr" if empty. + pub prefix: String, +} + +/// Derive a filesystem-safe prefix from an MCP upstream URL. +/// `http://localhost:9000/mcp` → `mcpr-localhost-9000` +/// `https://api.example.com` → `mcpr-api.example.com` +pub fn prefix_from_upstream(url: &str) -> String { + let stripped = url + .trim_start_matches("https://") + .trim_start_matches("http://"); + // Take host:port, drop path + let host_port = stripped.split('/').next().unwrap_or(stripped); + // Replace unsafe filesystem chars + let safe: String = host_port + .chars() + .map(|c| { + if c.is_alphanumeric() || c == '-' { + c + } else { + '-' + } + }) + .collect(); + format!("mcpr-{safe}") } /// JSONL file sink with buffered writes and rotation. /// /// Writes one JSON object per line to `{dir}/mcpr-{date}.log`. /// Rotation happens by size or daily, depending on config. +/// +/// Supports batch writes via `emit_batch()` for high-throughput scenarios. +/// The `LogRouter` drains the channel in batches and calls `emit_batch()` +/// to amortize the mutex acquisition and disk I/O cost. pub struct FileSink { config: FileSinkConfig, inner: Mutex, @@ -54,7 +85,7 @@ struct FileSinkInner { impl FileSink { pub fn new(config: FileSinkConfig) -> std::io::Result { fs::create_dir_all(&config.dir)?; - let (path, date) = Self::log_path(&config.dir); + let (path, date) = Self::log_path(&config.dir, &config.prefix); let file = Self::open_append(&path)?; let bytes_written = file.metadata().map(|m| m.len()).unwrap_or(0); Ok(Self { @@ -68,9 +99,9 @@ impl FileSink { }) } - fn log_path(dir: &Path) -> (PathBuf, String) { + fn log_path(dir: &Path, prefix: &str) -> (PathBuf, String) { let date = chrono::Utc::now().format("%Y-%m-%d").to_string(); - let path = dir.join(format!("mcpr-{date}.log")); + let path = dir.join(format!("{prefix}-{date}.log")); (path, date) } @@ -94,16 +125,13 @@ impl FileSink { match &self.config.rotation { Rotation::Size(_) => { - // Rename current file with a sequence number - let mut seq = 1u32; - loop { - let rotated = inner.current_path.with_extension(format!("{seq}.log")); - if !rotated.exists() { - fs::rename(&inner.current_path, &rotated)?; - break; - } - seq += 1; - } + // Rename current file with UTC timestamp + let ts = chrono::Utc::now().format("%Y-%m-%dT%H-%M-%SZ"); + let rotated = self + .config + .dir + .join(format!("{}-{ts}.log", self.config.prefix)); + fs::rename(&inner.current_path, &rotated)?; // Re-open same path (now empty) let file = Self::open_append(&inner.current_path)?; inner.writer = BufWriter::with_capacity(BUFFER_CAPACITY, file); @@ -111,7 +139,7 @@ impl FileSink { } Rotation::Daily => { // Open new file with today's date - let (path, date) = Self::log_path(&self.config.dir); + let (path, date) = Self::log_path(&self.config.dir, &self.config.prefix); let existing_size = path.metadata().map(|m| m.len()).unwrap_or(0); let file = Self::open_append(&path)?; inner.current_path = path; @@ -138,9 +166,10 @@ impl FileSink { .filter_map(|e| e.ok()) .map(|e| e.path()) .filter(|p| { + let prefix = &self.config.prefix; p.file_name() .and_then(|n| n.to_str()) - .is_some_and(|n| n.starts_with("mcpr-") && n.ends_with(".log")) + .is_some_and(|n| n.starts_with(&format!("{prefix}-")) && n.ends_with(".log")) }) .collect(); @@ -156,15 +185,11 @@ impl FileSink { let _ = fs::remove_file(path); } } -} -impl LogSink for FileSink { - fn emit(&self, entry: &LogEntry) { - let mut inner = self.inner.lock().unwrap(); - - // Check rotation before writing - if self.should_rotate(&inner) { - match self.rotate(&mut inner) { + /// Write a single serialized line + check rotation. + fn write_line(&self, inner: &mut FileSinkInner, line: &str) { + if self.should_rotate(inner) { + match self.rotate(inner) { Ok(()) => self.cleanup_old_files(), Err(e) => { eprintln!("mcpr: log rotation failed: {e}"); @@ -173,19 +198,40 @@ impl LogSink for FileSink { } } - // Serialize and write - match serde_json::to_string(entry) { - Ok(line) => { - let bytes = line.len() as u64 + 1; // +1 for newline - if let Err(e) = writeln!(inner.writer, "{line}") { - eprintln!("mcpr: log write failed: {e}"); - } else { - inner.bytes_written += bytes; - } - } - Err(e) => { - eprintln!("mcpr: log serialize failed: {e}"); - } + let bytes = line.len() as u64 + 1; + if let Err(e) = writeln!(inner.writer, "{line}") { + eprintln!("mcpr: log write failed: {e}"); + } else { + inner.bytes_written += bytes; + } + } +} + +impl LogSink for FileSink { + fn emit(&self, entry: &LogEntry) { + let Ok(line) = serde_json::to_string(entry) else { + eprintln!("mcpr: log serialize failed"); + return; + }; + let mut inner = self.inner.lock().unwrap(); + self.write_line(&mut inner, &line); + } + + fn emit_batch(&self, entries: &[LogEntry]) { + // Pre-serialize outside the lock + let lines: Vec = entries + .iter() + .filter_map(|e| serde_json::to_string(e).ok()) + .collect(); + + if lines.is_empty() { + return; + } + + // Single lock acquisition for the entire batch + let mut inner = self.inner.lock().unwrap(); + for line in &lines { + self.write_line(&mut inner, line); } } @@ -212,6 +258,7 @@ mod tests { dir: dir.path().to_path_buf(), rotation: Rotation::Size(DEFAULT_MAX_FILE_SIZE), max_files: DEFAULT_MAX_FILES, + prefix: "mcpr-test".to_string(), }; let sink = FileSink::new(config).unwrap(); @@ -247,6 +294,44 @@ mod tests { assert_eq!(second["path"], "/health"); } + #[test] + fn batch_writes_atomically() { + let dir = TempDir::new().unwrap(); + let config = FileSinkConfig { + dir: dir.path().to_path_buf(), + rotation: Rotation::Size(DEFAULT_MAX_FILE_SIZE), + max_files: DEFAULT_MAX_FILES, + prefix: "mcpr-test".to_string(), + }; + let sink = FileSink::new(config).unwrap(); + + let batch: Vec = (0..5) + .map(|i| make_entry("POST", &format!("/path/{i}"), 200)) + .collect(); + + sink.emit_batch(&batch); + sink.flush(); + + let entries: Vec = fs::read_dir(dir.path()) + .unwrap() + .filter_map(|e| e.ok()) + .filter(|e| e.path().extension().is_some_and(|ext| ext == "log")) + .flat_map(|e| { + fs::read_to_string(e.path()) + .unwrap() + .lines() + .map(String::from) + .collect::>() + }) + .collect(); + + assert_eq!(entries.len(), 5); + for (i, line) in entries.iter().enumerate() { + let val: serde_json::Value = serde_json::from_str(line).unwrap(); + assert_eq!(val["path"], format!("/path/{i}")); + } + } + #[test] fn rotates_by_size() { let dir = TempDir::new().unwrap(); @@ -254,6 +339,7 @@ mod tests { dir: dir.path().to_path_buf(), rotation: Rotation::Size(100), // Very small threshold max_files: DEFAULT_MAX_FILES, + prefix: "mcpr-test".to_string(), }; let sink = FileSink::new(config).unwrap(); @@ -291,6 +377,7 @@ mod tests { dir: dir.path().to_path_buf(), rotation: Rotation::Size(DEFAULT_MAX_FILE_SIZE), max_files: DEFAULT_MAX_FILES, + prefix: "mcpr-test".to_string(), }; let sink = FileSink::new(config).unwrap(); @@ -338,6 +425,7 @@ mod tests { dir: nested.clone(), rotation: Rotation::Size(DEFAULT_MAX_FILE_SIZE), max_files: DEFAULT_MAX_FILES, + prefix: "mcpr-test".to_string(), }; let sink = FileSink::new(config).unwrap(); @@ -354,6 +442,7 @@ mod tests { dir: dir.path().to_path_buf(), rotation: Rotation::Size(100), // Very small to trigger many rotations max_files: 3, + prefix: "mcpr-test".to_string(), }; let sink = FileSink::new(config).unwrap(); diff --git a/src/logger/mod.rs b/src/logger/mod.rs index 28708c7..9760d17 100644 --- a/src/logger/mod.rs +++ b/src/logger/mod.rs @@ -4,13 +4,15 @@ mod sink; mod tui_sink; pub use entry::LogEntry; -pub use file_sink::{DEFAULT_MAX_FILES, FileSink, FileSinkConfig, Rotation}; +pub use file_sink::{DEFAULT_MAX_FILES, FileSink, FileSinkConfig, Rotation, prefix_from_upstream}; pub use sink::LogSink; pub use tui_sink::TuiSink; use tokio::sync::mpsc; const CHANNEL_CAPACITY: usize = 4096; +const BATCH_SIZE: usize = 256; +const FLUSH_INTERVAL_MS: u64 = 5000; /// Routes log entries to multiple sinks via a bounded async channel. /// @@ -67,21 +69,44 @@ async fn router_task( mut shutdown_rx: mpsc::Receiver<()>, sinks: Vec>, ) { + let mut batch = Vec::with_capacity(BATCH_SIZE); + let mut flush_interval = + tokio::time::interval(tokio::time::Duration::from_millis(FLUSH_INTERVAL_MS)); + flush_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { tokio::select! { Some(entry) = rx.recv() => { + batch.push(entry); + + // Drain more entries if available (non-blocking) + while batch.len() < BATCH_SIZE { + match rx.try_recv() { + Ok(entry) => batch.push(entry), + Err(_) => break, + } + } + + // Dispatch the batch + dispatch_batch(&sinks, &batch); + batch.clear(); + } + _ = flush_interval.tick() => { + // Periodic flush for sinks with internal buffers for sink in &sinks { - sink.emit(&entry); + sink.flush(); } } _ = shutdown_rx.recv() => { // Drain remaining entries while let Ok(entry) = rx.try_recv() { - for sink in &sinks { - sink.emit(&entry); - } + batch.push(entry); + } + if !batch.is_empty() { + dispatch_batch(&sinks, &batch); + batch.clear(); } - // Flush all sinks + // Final flush for sink in &sinks { sink.flush(); } @@ -91,6 +116,16 @@ async fn router_task( } } +fn dispatch_batch(sinks: &[Box], batch: &[LogEntry]) { + for sink in sinks { + if batch.len() == 1 { + sink.emit(&batch[0]); + } else { + sink.emit_batch(batch); + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/logger/sink.rs b/src/logger/sink.rs index faee87e..5be5049 100644 --- a/src/logger/sink.rs +++ b/src/logger/sink.rs @@ -3,11 +3,23 @@ use super::entry::LogEntry; /// Trait for log output backends. /// /// Implement this to add new log destinations (file, database, metrics, etc.). -/// Sinks receive cloned `LogEntry` values from the `LogRouter` background task. +/// Sinks receive `LogEntry` values from the `LogRouter` background task. +/// +/// For high-throughput scenarios, override `emit_batch()` to amortize +/// per-entry overhead (mutex acquisition, disk I/O, network calls). pub trait LogSink: Send + Sync + 'static { /// Process a single log entry. Called from the router's background task. fn emit(&self, entry: &LogEntry); + /// Process a batch of log entries. Default implementation calls `emit()` + /// for each entry. Override for sinks that benefit from batching + /// (e.g., file I/O, database inserts). + fn emit_batch(&self, entries: &[LogEntry]) { + for entry in entries { + self.emit(entry); + } + } + /// Flush any buffered data. Called during graceful shutdown. fn flush(&self) {} } diff --git a/src/main.rs b/src/main.rs index a819f39..96d996b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -21,7 +21,10 @@ use tower_http::cors::{Any, CorsLayer}; use config::{GatewayConfig, Mode}; use display::log_startup; -use logger::{DEFAULT_MAX_FILES, FileSink, FileSinkConfig, LogRouter, LogSink, Rotation, TuiSink}; +use logger::{ + DEFAULT_MAX_FILES, FileSink, FileSinkConfig, LogRouter, LogSink, Rotation, TuiSink, + prefix_from_upstream, +}; use proxy::proxy_routes; use rewrite::RewriteConfig; use session::MemorySessionStore; @@ -220,6 +223,7 @@ async fn run_gateway(cfg: GatewayConfig) { dir: std::path::PathBuf::from(&dir), rotation, max_files: DEFAULT_MAX_FILES, + prefix: prefix_from_upstream(&mcp), }) { Ok(sink) => { sinks.push(Box::new(sink));