From 7b2f888547f4e3ccd7298352421ffb8f60686376 Mon Sep 17 00:00:00 2001 From: Mingwei Zhang Date: Sun, 15 Mar 2026 11:50:05 -0700 Subject: [PATCH 1/2] feat: add rib reconstruction command - add rib reconstruction for arbitrary timestamps using base RIBs and replayed updates - support stdout and SQLite output with country and full-feed filters - document rib command behavior in the README and changelog --- CHANGELOG.md | 16 + Cargo.lock | 1 + Cargo.toml | 4 + README.md | 61 ++ src/bin/commands/elem_format.rs | 25 + src/bin/commands/mod.rs | 1 + src/bin/commands/rib.rs | 167 +++++ src/bin/monocle.rs | 7 + src/database/mod.rs | 5 + src/database/session/mod.rs | 7 + src/database/session/rib_store.rs | 505 +++++++++++++ src/lens/mod.rs | 4 + src/lens/rib/mod.rs | 1143 +++++++++++++++++++++++++++++ 13 files changed, 1946 insertions(+) create mode 100644 src/bin/commands/rib.rs create mode 100644 src/database/session/rib_store.rs create mode 100644 src/lens/rib/mod.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 92119c7..861a05c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,22 @@ All notable changes to this project will be documented in this file. +## Unreleased changes + +### New Features + +* Added `monocle rib` for reconstructing RIB state at arbitrary timestamps + * Selects the latest RIB before each requested `rib_ts` and replays updates to the exact timestamp + * Supports stdout output by default and merged SQLite output + * Repeated `--ts` values are written to one merged SQLite file keyed by `rib_ts` + * Aborts when no RIB exists at or before a requested `rib_ts` for a selected collector + * Supports `--country`, `--origin-asn`, `--prefix`, `--as-path`, `--peer-asn`, `--collector`, `--project`, and `--full-feed-only` + * Auto-generated output filenames include requested timestamps and normalized filter slugs + +### Code Improvements + +* Added session-backed SQLite stores for reconstructed RIB working state and merged SQLite export + ## v1.2.0 - 2026-02-28 ### New Features diff --git a/Cargo.lock b/Cargo.lock index 06fd17d..e45f46f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1673,6 +1673,7 @@ dependencies = [ "oneio", "radar-rs", "rayon", + "regex", "rusqlite", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 20f841c..195b746 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,6 +88,7 @@ lib = [ # Database "dep:oneio", "dep:ipnet", + "dep:tempfile", # Lenses "dep:chrono-humanize", "dep:dateparser", @@ -98,6 +99,7 @@ lib = [ "dep:itertools", "dep:radar-rs", "dep:rayon", + "dep:regex", # Display (always included with lib) "dep:tabled", "dep:json_to_table", @@ -151,6 +153,7 @@ tracing = "0.1" # Database ipnet = { version = "2.10", features = ["json"], optional = true } oneio = { version = "0.20.1", default-features = false, features = ["https", "gz", "bz", "json"], optional = true } +tempfile = { version = "3", optional = true } # Lenses chrono-humanize = { version = "0.2", optional = true } @@ -162,6 +165,7 @@ bgpkit-commons = { version = "0.10.2", features = ["asinfo", "rpki", "countries" itertools = { version = "0.14", optional = true } radar-rs = { version = "0.1.0", optional = true } rayon = { version = "1.8", optional = true } +regex = { version = "1.11", optional = true } # Display tabled = { version = "0.20", optional = true } diff --git a/README.md b/README.md index 18d14c4..946af0d 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ See through all Border Gateway Protocol (BGP) data with a monocle. - [`monocle parse`](#monocle-parse) - [Output Format](#output-format) - [`monocle search`](#monocle-search) + - [`monocle rib`](#monocle-rib) - [`monocle time`](#monocle-time) - [`monocle inspect`](#monocle-inspect) - [`monocle country`](#monocle-country) @@ -229,6 +230,7 @@ Subcommands: - `parse`: parse individual MRT files - `search`: search for matching messages from all available public MRT files +- `rib`: reconstruct final RIB state at one or more arbitrary timestamps - `server`: start a WebSocket server for programmatic access - `inspect`: unified AS and prefix information lookup - `country`: utility to look up country name and code @@ -259,6 +261,7 @@ Usage: monocle [OPTIONS] Commands: parse Parse individual MRT files given a file path, local or remote search Search BGP messages from all available public MRT files + rib Reconstruct final RIB state at one or more arbitrary timestamps server Start the WebSocket server (ws://
:/ws, health: http://
:/health) inspect Unified AS and prefix information lookup country Country name and code lookup utilities @@ -701,6 +704,64 @@ Use `--broker-files` to see the list of MRT files that would be queried without -c rrc00 --broker-files ``` +### `monocle rib` + +Reconstruct final RIB state at one or more arbitrary timestamps by loading the latest RIB at or before each requested `rib_ts` and replaying updates up to the exact timestamp. + +```text +➜ monocle rib --help +Reconstruct final RIB state at one or more arbitrary timestamps + +Usage: monocle rib [OPTIONS] --ts + +Options: + --ts Target RIB timestamp. Repeat to request multiple snapshots + --debug Print debug information + -o, --origin-asn Filter by origin AS Number(s), comma-separated. Prefix with ! to exclude + -C, --country Filter by origin ASN registration country + --format Output format: table, markdown, json, json-pretty, json-line, psv (default varies by command) + --json Output as JSON objects (shortcut for --format json-pretty) + -p, --prefix Filter by network prefix(es), comma-separated. Prefix with ! to exclude + --no-update Disable automatic database updates (use existing cached data only) + -s, --include-super Include super-prefixes when filtering + -S, --include-sub Include sub-prefixes when filtering + -J, --peer-asn Filter by peer ASN(s), comma-separated. Prefix with ! to exclude + -a, --as-path Filter by AS path regex string + -c, --collector Filter by collector, e.g., rrc00 or route-views2 + -P, --project Filter by route collection project, i.e. riperis or routeviews + --full-feed-only Keep only full-feed peers based on broker peer metadata + --output-type File output type. If omitted and `--output-dir` is also omitted, output goes to stdout [possible values: sqlite] + --output-dir Output directory for generated SQLite files + -h, --help Print help + -V, --version Print version +``` + +Behavior: + +- A single `--ts` writes to stdout by default. +- Repeated `--ts` values require file output and are written to one merged SQLite file keyed by `rib_ts`. +- If any selected collector has no RIB at or before a requested `rib_ts`, the command aborts instead of producing a partial result. +- `--country` uses local ASInfo registration data, and `--full-feed-only` keeps only peers with at least 800k IPv4 prefixes or 100k IPv6 prefixes in broker peer metadata. + +Examples: + +```bash +# Print the reconstructed RIB for a single timestamp to stdout +monocle rib --ts 2025-09-01T12:00:00Z -c rrc00 -o 13335 + +# Write multiple timestamps to one merged SQLite file in the current directory +monocle rib \ + --ts 2025-09-01T12:00:00Z \ + --ts 2025-09-01T18:00:00Z \ + --output-type sqlite \ + -c rrc00 \ + --country US \ + --full-feed-only + +# Override the output directory +monocle rib --ts 2025-09-01T12:00:00Z --output-dir /tmp/rib-out -c route-views2 +``` + ### `monocle time` Parse and convert time strings between various formats. diff --git a/src/bin/commands/elem_format.rs b/src/bin/commands/elem_format.rs index edc4d20..452029d 100644 --- a/src/bin/commands/elem_format.rs +++ b/src/bin/commands/elem_format.rs @@ -17,7 +17,9 @@ pub const AVAILABLE_FIELDS: &[&str] = &[ "peer_ip", "peer_asn", "prefix", + "path_id", "as_path", + "origin_asns", "origin", "next_hop", "local_pref", @@ -174,11 +176,26 @@ pub fn get_field_value_with_time_format( "peer_ip" => elem.peer_ip.to_string(), "peer_asn" => elem.peer_asn.to_string(), "prefix" => elem.prefix.to_string(), + "path_id" => elem + .prefix + .path_id + .map(|path_id| path_id.to_string()) + .unwrap_or_default(), "as_path" => elem .as_path .as_ref() .map(|p| p.to_string()) .unwrap_or_default(), + "origin_asns" => elem + .origin_asns + .as_ref() + .map(|asns| { + asns.iter() + .map(|asn| asn.to_string()) + .collect::>() + .join(" ") + }) + .unwrap_or_default(), "origin" => elem .origin .as_ref() @@ -288,10 +305,18 @@ pub fn build_json_object( "peer_ip" => json!(elem.peer_ip.to_string()), "peer_asn" => json!(elem.peer_asn), "prefix" => json!(elem.prefix.to_string()), + "path_id" => match elem.prefix.path_id { + Some(path_id) => json!(path_id), + None => serde_json::Value::Null, + }, "as_path" => match &elem.as_path { Some(p) => json!(p.to_string()), None => serde_json::Value::Null, }, + "origin_asns" => match &elem.origin_asns { + Some(asns) => json!(asns.iter().map(|asn| asn.to_string()).collect::>()), + None => serde_json::Value::Null, + }, "origin" => match &elem.origin { Some(o) => json!(o.to_string()), None => serde_json::Value::Null, diff --git a/src/bin/commands/mod.rs b/src/bin/commands/mod.rs index 8f7a72e..4f5d0f0 100644 --- a/src/bin/commands/mod.rs +++ b/src/bin/commands/mod.rs @@ -6,6 +6,7 @@ pub mod inspect; pub mod ip; pub mod parse; pub mod pfx2as; +pub mod rib; pub mod rpki; pub mod search; pub mod time; diff --git a/src/bin/commands/rib.rs b/src/bin/commands/rib.rs new file mode 100644 index 0000000..1aef1e8 --- /dev/null +++ b/src/bin/commands/rib.rs @@ -0,0 +1,167 @@ +use std::fs; +use std::io::Write; +use std::path::{Path, PathBuf}; + +use anyhow::{anyhow, Result}; +use bgpkit_parser::BgpElem; + +use monocle::database::{MonocleDatabase, RibSqliteStore}; +use monocle::lens::rib::{RibLens, RibOutputType}; +use monocle::utils::{OutputFormat, TimestampFormat}; +use monocle::MonocleConfig; + +use super::elem_format::{format_elem, format_elems_table, get_header}; + +pub use monocle::lens::rib::RibArgs; + +const DEFAULT_FIELDS_RIB: &[&str] = &[ + "type", + "timestamp", + "peer_ip", + "peer_asn", + "prefix", + "path_id", + "as_path", + "origin_asns", + "origin", + "next_hop", + "local_pref", + "med", + "communities", + "atomic", + "aggr_asn", + "aggr_ip", + "collector", +]; + +pub fn run(config: &MonocleConfig, args: RibArgs, output_format: OutputFormat, no_update: bool) { + if let Err(error) = run_inner(config, args, output_format, no_update) { + eprintln!("ERROR: {}", error); + std::process::exit(1); + } +} + +fn run_inner( + config: &MonocleConfig, + args: RibArgs, + output_format: OutputFormat, + no_update: bool, +) -> Result<()> { + let sqlite_path = config.sqlite_path(); + let db = MonocleDatabase::open(&sqlite_path) + .map_err(|e| anyhow!("Failed to open database '{}': {}", sqlite_path, e))?; + let lens = RibLens::new(&db, config); + + match args.file_output_type() { + None => run_stdout(&lens, &args, output_format, no_update), + Some(RibOutputType::Sqlite) => run_sqlite_output(&lens, &args, no_update), + } +} + +fn run_stdout( + lens: &RibLens<'_>, + args: &RibArgs, + output_format: OutputFormat, + no_update: bool, +) -> Result<()> { + let mut stdout = std::io::stdout(); + + if output_format == OutputFormat::Table { + let mut elems = Vec::<(BgpElem, Option)>::new(); + lens.reconstruct_snapshots(args, no_update, |_rib_ts, state_store| { + state_store.visit_entries(|entry| { + elems.push((entry.elem, Some(entry.collector))); + Ok(()) + }) + })?; + + if !elems.is_empty() { + writeln!( + stdout, + "{}", + format_elems_table(&elems, DEFAULT_FIELDS_RIB, TimestampFormat::Unix) + ) + .map_err(|e| anyhow!("Failed to write table output: {}", e))?; + } + return Ok(()); + } + + let mut header_written = false; + lens.reconstruct_snapshots(args, no_update, |_rib_ts, state_store| { + if !header_written { + if let Some(header) = get_header(output_format, DEFAULT_FIELDS_RIB) { + writeln!(stdout, "{}", header) + .map_err(|e| anyhow!("Failed to write output header: {}", e))?; + } + header_written = true; + } + + state_store.visit_entries(|entry| { + if let Some(line) = format_elem( + &entry.elem, + output_format, + DEFAULT_FIELDS_RIB, + Some(entry.collector.as_str()), + TimestampFormat::Unix, + ) { + writeln!(stdout, "{}", line) + .map_err(|e| anyhow!("Failed to write reconstructed RIB row: {}", e))?; + } + Ok(()) + }) + })?; + + Ok(()) +} + +fn run_sqlite_output(lens: &RibLens<'_>, args: &RibArgs, no_update: bool) -> Result<()> { + let normalized_ts = args.validate()?; + let output_dir = ensure_output_dir(lens.output_directory(args)?)?; + let output_path = output_dir.join(format!( + "{}.sqlite3", + lens.file_name_prefix(args, &normalized_ts)? + )); + + remove_existing_file(&output_path)?; + + let sqlite_store = RibSqliteStore::new(path_to_str(&output_path)?, true)?; + let summary = lens.reconstruct_snapshots(args, no_update, |rib_ts, state_store| { + state_store.visit_entries(|entry| sqlite_store.insert_entry(rib_ts, &entry)) + })?; + + eprintln!( + "wrote {} reconstructed RIB snapshot(s) to {}", + summary.rib_ts.len(), + output_path.display() + ); + Ok(()) +} + +fn ensure_output_dir(path: Option) -> Result { + let output_dir = path.ok_or_else(|| anyhow!("Failed to resolve output directory"))?; + fs::create_dir_all(&output_dir).map_err(|e| { + anyhow!( + "Failed to create output directory '{}': {}", + output_dir.display(), + e + ) + })?; + Ok(output_dir) +} + +fn remove_existing_file(path: &Path) -> Result<()> { + match fs::remove_file(path) { + Ok(()) => Ok(()), + Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(()), + Err(error) => Err(anyhow!( + "Failed to remove existing output file '{}': {}", + path.display(), + error + )), + } +} + +fn path_to_str(path: &Path) -> Result<&str> { + path.to_str() + .ok_or_else(|| anyhow!("Path '{}' contains invalid UTF-8", path.display())) +} diff --git a/src/bin/monocle.rs b/src/bin/monocle.rs index 20dac06..c5cf360 100644 --- a/src/bin/monocle.rs +++ b/src/bin/monocle.rs @@ -17,6 +17,7 @@ use commands::inspect::InspectArgs; use commands::ip::IpArgs; use commands::parse::ParseArgs; use commands::pfx2as::Pfx2asArgs; +use commands::rib::RibArgs; use commands::rpki::RpkiCommands; use commands::search::SearchArgs; use commands::time::TimeArgs; @@ -57,6 +58,9 @@ enum Commands { /// Search BGP messages from all available public MRT files. Search(SearchArgs), + /// Reconstruct final RIB state at one or more arbitrary timestamps. + Rib(RibArgs), + /// Start the WebSocket server (ws://
:/ws, health: http://
:/health) /// /// Note: This requires building with the `server` feature enabled. @@ -176,6 +180,9 @@ fn main() { match cli.command { Commands::Parse(args) => commands::parse::run(args, streaming_output_format), Commands::Search(args) => commands::search::run(&config, args, streaming_output_format), + Commands::Rib(args) => { + commands::rib::run(&config, args, streaming_output_format, cli.no_update) + } Commands::Server(args) => { // The server requires the `server` feature (axum + tokio). Keep the CLI diff --git a/src/database/mod.rs b/src/database/mod.rs index ed01451..8825d19 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -125,6 +125,11 @@ pub use monocle::{ // Requires lib feature because MsgStore depends on bgpkit_parser::BgpElem #[cfg(feature = "lib")] pub use session::MsgStore; +#[cfg(feature = "lib")] +pub use session::{ + elem_matches_stored_json, path_id_for_key, path_id_from_key, RibRouteKey, RibSqliteStore, + RibStateStore, StoredRibEntry, +}; // ============================================================================= // Helper function diff --git a/src/database/session/mod.rs b/src/database/session/mod.rs index d69b267..f62c246 100644 --- a/src/database/session/mod.rs +++ b/src/database/session/mod.rs @@ -15,6 +15,13 @@ #[cfg(feature = "lib")] mod msg_store; +#[cfg(feature = "lib")] +mod rib_store; #[cfg(feature = "lib")] pub use msg_store::MsgStore; +#[cfg(feature = "lib")] +pub use rib_store::{ + elem_matches_stored_json, path_id_for_key, path_id_from_key, RibRouteKey, RibSqliteStore, + RibStateStore, StoredRibEntry, +}; diff --git a/src/database/session/rib_store.rs b/src/database/session/rib_store.rs new file mode 100644 index 0000000..ec11255 --- /dev/null +++ b/src/database/session/rib_store.rs @@ -0,0 +1,505 @@ +//! Session-based SQLite stores for reconstructed RIB snapshots. +//! +//! These stores are separate from `MsgStore` because RIB reconstruction needs: +//! - route-identity keys with `path_id` +//! - exact `BgpElem` round-tripping for reconstructed RIB state +//! - merged SQLite output keyed by `rib_ts` + +use anyhow::{anyhow, Result}; +use bgpkit_parser::BgpElem; +use rusqlite::{params, OptionalExtension}; +use serde_json::Value; +use tempfile::{NamedTempFile, TempPath}; + +use crate::database::core::DatabaseConn; + +fn opt_to_sql_i64(v: Option) -> i64 { + v.map(i64::from).unwrap_or(-1) +} + +fn sql_i64_to_opt(v: i64) -> Option { + if v < 0 { + None + } else { + u32::try_from(v).ok() + } +} + +fn elem_as_path(elem: &BgpElem) -> Option { + elem.as_path.as_ref().map(|path| path.to_string()) +} + +fn elem_origin_asns(elem: &BgpElem) -> Option { + elem.origin_asns.as_ref().map(|asns| { + asns.iter() + .map(|asn| asn.to_string()) + .collect::>() + .join(" ") + }) +} + +fn elem_next_hop(elem: &BgpElem) -> Option { + elem.next_hop.as_ref().map(|hop| hop.to_string()) +} + +fn elem_communities(elem: &BgpElem) -> Option { + elem.communities.as_ref().map(|communities| { + communities + .iter() + .map(|community| community.to_string()) + .collect::>() + .join(" ") + }) +} + +fn elem_origin(elem: &BgpElem) -> Option { + elem.origin.as_ref().map(|origin| origin.to_string()) +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct RibRouteKey { + pub collector: String, + pub peer_ip: String, + pub peer_asn: u32, + pub prefix: String, + pub path_id: Option, +} + +impl RibRouteKey { + pub fn from_elem(collector: &str, elem: &BgpElem) -> Self { + Self { + collector: collector.to_string(), + peer_ip: elem.peer_ip.to_string(), + peer_asn: elem.peer_asn.to_u32(), + prefix: elem.prefix.prefix.to_string(), + path_id: elem.prefix.path_id, + } + } +} + +#[derive(Debug, Clone)] +pub struct StoredRibEntry { + pub collector: String, + pub elem: BgpElem, +} + +impl StoredRibEntry { + pub fn new(collector: impl Into, elem: BgpElem) -> Self { + Self { + collector: collector.into(), + elem, + } + } + + pub fn route_key(&self) -> RibRouteKey { + RibRouteKey::from_elem(&self.collector, &self.elem) + } + + fn elem_json(&self) -> Result { + serde_json::to_string(&self.elem) + .map_err(|e| anyhow!("Failed to serialize BgpElem for SQLite storage: {}", e)) + } + + fn from_row(row: &rusqlite::Row<'_>) -> Result { + let collector: String = row + .get("collector") + .map_err(|e| anyhow!("Failed to read collector column: {}", e))?; + let elem_json: String = row + .get("elem_json") + .map_err(|e| anyhow!("Failed to read elem_json column: {}", e))?; + let elem = serde_json::from_str::(&elem_json) + .map_err(|e| anyhow!("Failed to deserialize stored BgpElem JSON: {}", e))?; + Ok(Self { collector, elem }) + } +} + +pub struct RibStateStore { + db: DatabaseConn, + _temp_path: Option, +} + +impl RibStateStore { + pub fn new(db_path: Option<&str>, reset: bool) -> Result { + let db = DatabaseConn::open(db_path)?; + let store = Self { + db, + _temp_path: None, + }; + store.initialize(reset)?; + Ok(store) + } + + pub fn new_temp() -> Result { + let file = NamedTempFile::new().map_err(|e| { + anyhow!( + "Failed to create temporary SQLite path for rib state: {}", + e + ) + })?; + let temp_path = file.into_temp_path(); + let db = DatabaseConn::open_path( + temp_path + .to_str() + .ok_or_else(|| anyhow!("Temporary rib state path contains invalid UTF-8"))?, + )?; + let store = Self { + db, + _temp_path: Some(temp_path), + }; + store.initialize(true)?; + Ok(store) + } + + fn initialize(&self, reset: bool) -> Result<()> { + if reset { + self.db + .conn + .execute("DROP TABLE IF EXISTS rib_state", []) + .map_err(|e| anyhow!("Failed to drop rib_state table: {}", e))?; + } + + self.db + .conn + .execute_batch( + r#" + CREATE TABLE IF NOT EXISTS rib_state ( + collector TEXT NOT NULL, + peer_ip TEXT NOT NULL, + peer_asn INTEGER NOT NULL, + prefix TEXT NOT NULL, + path_id INTEGER NOT NULL, + timestamp REAL NOT NULL, + as_path TEXT, + origin_asns TEXT, + origin TEXT, + next_hop TEXT, + local_pref INTEGER, + med INTEGER, + communities TEXT, + atomic INTEGER NOT NULL, + aggr_asn INTEGER, + aggr_ip TEXT, + elem_json TEXT NOT NULL, + PRIMARY KEY (collector, peer_ip, peer_asn, prefix, path_id) + ); + CREATE INDEX IF NOT EXISTS idx_rib_state_collector ON rib_state(collector); + CREATE INDEX IF NOT EXISTS idx_rib_state_peer_asn ON rib_state(peer_asn); + CREATE INDEX IF NOT EXISTS idx_rib_state_prefix ON rib_state(prefix); + "#, + ) + .map_err(|e| anyhow!("Failed to initialize rib_state schema: {}", e))?; + Ok(()) + } + + pub fn count(&self) -> Result { + self.db.table_count("rib_state") + } + + pub fn route_exists(&self, key: &RibRouteKey) -> Result { + let exists = self + .db + .conn + .query_row( + "SELECT 1 FROM rib_state WHERE collector = ?1 AND peer_ip = ?2 AND peer_asn = ?3 AND prefix = ?4 AND path_id = ?5", + params![ + key.collector, + key.peer_ip, + key.peer_asn, + key.prefix, + opt_to_sql_i64(key.path_id), + ], + |_| Ok(()), + ) + .optional() + .map_err(|e| anyhow!("Failed to test route existence in rib_state: {}", e))?; + Ok(exists.is_some()) + } + + pub fn upsert_entry(&self, entry: &StoredRibEntry) -> Result<()> { + self.upsert_entries(std::slice::from_ref(entry)) + } + + pub fn upsert_entries(&self, entries: &[StoredRibEntry]) -> Result<()> { + if entries.is_empty() { + return Ok(()); + } + + let tx = self + .db + .conn + .unchecked_transaction() + .map_err(|e| anyhow!("Failed to begin rib_state transaction: {}", e))?; + let mut stmt = tx + .prepare_cached( + r#" + INSERT OR REPLACE INTO rib_state ( + collector, peer_ip, peer_asn, prefix, path_id, timestamp, + as_path, origin_asns, origin, next_hop, local_pref, med, + communities, atomic, aggr_asn, aggr_ip, elem_json + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17) + "#, + ) + .map_err(|e| anyhow!("Failed to prepare rib_state upsert statement: {}", e))?; + + for entry in entries { + stmt.execute(params![ + entry.collector, + entry.elem.peer_ip.to_string(), + entry.elem.peer_asn.to_u32(), + entry.elem.prefix.prefix.to_string(), + opt_to_sql_i64(entry.elem.prefix.path_id), + entry.elem.timestamp, + elem_as_path(&entry.elem), + elem_origin_asns(&entry.elem), + elem_origin(&entry.elem), + elem_next_hop(&entry.elem), + entry.elem.local_pref, + entry.elem.med, + elem_communities(&entry.elem), + if entry.elem.atomic { 1_i64 } else { 0_i64 }, + entry.elem.aggr_asn.map(|asn| asn.to_u32()), + entry.elem.aggr_ip.as_ref().map(|ip| ip.to_string()), + entry.elem_json()?, + ]) + .map_err(|e| anyhow!("Failed to upsert entry into rib_state: {}", e))?; + } + + drop(stmt); + tx.commit() + .map_err(|e| anyhow!("Failed to commit rib_state upserts: {}", e))?; + Ok(()) + } + + pub fn delete_key(&self, key: &RibRouteKey) -> Result<()> { + self.delete_keys(std::slice::from_ref(key)) + } + + pub fn delete_keys(&self, keys: &[RibRouteKey]) -> Result<()> { + if keys.is_empty() { + return Ok(()); + } + + let tx = self + .db + .conn + .unchecked_transaction() + .map_err(|e| anyhow!("Failed to begin rib_state delete transaction: {}", e))?; + let mut stmt = tx + .prepare_cached( + "DELETE FROM rib_state WHERE collector = ?1 AND peer_ip = ?2 AND peer_asn = ?3 AND prefix = ?4 AND path_id = ?5", + ) + .map_err(|e| anyhow!("Failed to prepare rib_state delete statement: {}", e))?; + + for key in keys { + stmt.execute(params![ + key.collector, + key.peer_ip, + key.peer_asn, + key.prefix, + opt_to_sql_i64(key.path_id), + ]) + .map_err(|e| anyhow!("Failed to delete entry from rib_state: {}", e))?; + } + + drop(stmt); + tx.commit() + .map_err(|e| anyhow!("Failed to commit rib_state deletes: {}", e))?; + Ok(()) + } + + pub fn visit_entries(&self, mut visitor: F) -> Result<()> + where + F: FnMut(StoredRibEntry) -> Result<()>, + { + let mut stmt = self + .db + .conn + .prepare( + "SELECT collector, elem_json FROM rib_state ORDER BY collector, peer_asn, peer_ip, prefix, path_id", + ) + .map_err(|e| anyhow!("Failed to prepare rib_state scan statement: {}", e))?; + + let mut rows = stmt + .query([]) + .map_err(|e| anyhow!("Failed to query rib_state rows: {}", e))?; + + while let Some(row) = rows + .next() + .map_err(|e| anyhow!("Failed to iterate rib_state rows: {}", e))? + { + visitor(StoredRibEntry::from_row(row)?)?; + } + + Ok(()) + } +} + +pub struct RibSqliteStore { + db: DatabaseConn, +} + +impl RibSqliteStore { + pub fn new(db_path: &str, reset: bool) -> Result { + let db = DatabaseConn::open_path(db_path)?; + let store = Self { db }; + store.initialize(reset)?; + Ok(store) + } + + fn initialize(&self, reset: bool) -> Result<()> { + if reset { + self.db + .conn + .execute("DROP TABLE IF EXISTS elems", []) + .map_err(|e| anyhow!("Failed to drop existing rib output elems table: {}", e))?; + } + + self.db + .conn + .execute_batch( + r#" + CREATE TABLE IF NOT EXISTS elems ( + rib_ts INTEGER NOT NULL, + timestamp REAL NOT NULL, + collector TEXT NOT NULL, + peer_ip TEXT NOT NULL, + peer_asn INTEGER NOT NULL, + prefix TEXT NOT NULL, + path_id INTEGER NOT NULL, + as_path TEXT, + origin_asns TEXT, + origin TEXT, + next_hop TEXT, + local_pref INTEGER, + med INTEGER, + communities TEXT, + atomic INTEGER NOT NULL, + aggr_asn INTEGER, + aggr_ip TEXT + ); + CREATE INDEX IF NOT EXISTS idx_rib_output_rib_ts ON elems(rib_ts); + CREATE INDEX IF NOT EXISTS idx_rib_output_rib_ts_prefix ON elems(rib_ts, prefix); + CREATE INDEX IF NOT EXISTS idx_rib_output_rib_ts_peer_asn ON elems(rib_ts, peer_asn); + CREATE INDEX IF NOT EXISTS idx_rib_output_rib_ts_collector ON elems(rib_ts, collector); + "#, + ) + .map_err(|e| anyhow!("Failed to initialize rib output SQLite schema: {}", e))?; + Ok(()) + } + + pub fn insert_entry(&self, rib_ts: i64, entry: &StoredRibEntry) -> Result<()> { + self.db + .conn + .execute( + r#" + INSERT INTO elems ( + rib_ts, timestamp, collector, peer_ip, peer_asn, prefix, path_id, + as_path, origin_asns, origin, next_hop, local_pref, med, + communities, atomic, aggr_asn, aggr_ip + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17) + "#, + params![ + rib_ts, + entry.elem.timestamp, + entry.collector, + entry.elem.peer_ip.to_string(), + entry.elem.peer_asn.to_u32(), + entry.elem.prefix.prefix.to_string(), + opt_to_sql_i64(entry.elem.prefix.path_id), + elem_as_path(&entry.elem), + elem_origin_asns(&entry.elem), + elem_origin(&entry.elem), + elem_next_hop(&entry.elem), + entry.elem.local_pref, + entry.elem.med, + elem_communities(&entry.elem), + if entry.elem.atomic { 1_i64 } else { 0_i64 }, + entry.elem.aggr_asn.map(|asn| asn.to_u32()), + entry.elem.aggr_ip.as_ref().map(|ip| ip.to_string()), + ], + ) + .map_err(|e| anyhow!("Failed to insert entry into rib output SQLite store: {}", e))?; + Ok(()) + } +} + +pub fn elem_matches_stored_json(elem_json: &str, key: &str) -> Result> { + let value = serde_json::from_str::(elem_json) + .map_err(|e| anyhow!("Failed to deserialize stored elem_json value: {}", e))?; + Ok(value.get(key).cloned()) +} + +pub fn path_id_for_key(path_id: Option) -> i64 { + opt_to_sql_i64(path_id) +} + +pub fn path_id_from_key(path_id: i64) -> Option { + sql_i64_to_opt(path_id) +} + +#[cfg(test)] +mod tests { + use super::*; + use bgpkit_parser::models::{AsPath, AsPathSegment, ElemType, NetworkPrefix}; + use std::net::{IpAddr, Ipv4Addr}; + + fn test_elem() -> Result { + Ok(BgpElem { + timestamp: 1234.0, + elem_type: ElemType::ANNOUNCE, + peer_ip: IpAddr::V4(Ipv4Addr::new(192, 0, 2, 1)), + peer_asn: 64496.into(), + prefix: NetworkPrefix::new("203.0.113.0/24".parse()?, Some(7)), + next_hop: Some(IpAddr::V4(Ipv4Addr::new(192, 0, 2, 2))), + as_path: Some(AsPath { + segments: vec![AsPathSegment::AsSequence(vec![64496.into(), 64497.into()])], + }), + origin_asns: Some(vec![64497.into()]), + origin: None, + local_pref: Some(100), + med: Some(50), + communities: None, + atomic: false, + aggr_asn: None, + aggr_ip: None, + only_to_customer: None, + unknown: None, + deprecated: None, + }) + } + + #[test] + fn test_rib_state_store_round_trip() -> Result<()> { + let store = RibStateStore::new_temp()?; + let entry = StoredRibEntry::new("rrc00", test_elem()?); + store.upsert_entry(&entry)?; + assert!(store.route_exists(&entry.route_key())?); + + let mut visited = Vec::new(); + store.visit_entries(|entry| { + visited.push(entry); + Ok(()) + })?; + + assert_eq!(visited.len(), 1); + assert_eq!(visited[0].collector, "rrc00"); + assert_eq!(visited[0].elem.prefix.path_id, Some(7)); + Ok(()) + } + + #[test] + fn test_path_id_helpers() { + assert_eq!(path_id_for_key(None), -1); + assert_eq!(path_id_from_key(-1), None); + assert_eq!(path_id_from_key(42), Some(42)); + } + + #[test] + fn test_elem_json_access() -> Result<()> { + let elem = test_elem()?; + let entry = StoredRibEntry::new("rrc00", elem); + let origin_asns = elem_matches_stored_json(&entry.elem_json()?, "origin_asns")?; + assert!(origin_asns.is_some()); + Ok(()) + } +} diff --git a/src/lens/mod.rs b/src/lens/mod.rs index 3deaa96..937b883 100644 --- a/src/lens/mod.rs +++ b/src/lens/mod.rs @@ -78,6 +78,10 @@ pub mod parse; #[cfg(feature = "lib")] pub mod search; +// RibLens - arbitrary timestamp RIB reconstruction +#[cfg(feature = "lib")] +pub mod rib; + // RpkiLens - RPKI validation and data #[cfg(feature = "lib")] pub mod rpki; diff --git a/src/lens/rib/mod.rs b/src/lens/rib/mod.rs new file mode 100644 index 0000000..f40b923 --- /dev/null +++ b/src/lens/rib/mod.rs @@ -0,0 +1,1143 @@ +//! RIB reconstruction lens. +//! +//! This module reconstructs final RIB state at arbitrary timestamps by: +//! 1. Selecting the latest RIB before each target time +//! 2. Replaying overlapping updates up to the exact target time +//! 3. Materializing only the final route state for each requested `rib_ts` + +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; + +use anyhow::{anyhow, Result}; +use bgpkit_broker::{BgpkitBroker, BrokerItem}; +use bgpkit_parser::models::ElemType; +use bgpkit_parser::BgpElem; +use chrono::{DateTime, Duration}; +use regex::Regex; +use serde::{Deserialize, Serialize}; + +use crate::config::MonocleConfig; +use crate::database::{MonocleDatabase, RibRouteKey, RibStateStore, StoredRibEntry}; +use crate::lens::country::CountryLens; +use crate::lens::parse::ParseFilters; +use crate::lens::time::TimeLens; + +#[cfg(feature = "cli")] +use clap::{Args, ValueEnum}; + +const FULL_FEED_V4_THRESHOLD: u32 = 800_000; +const FULL_FEED_V6_THRESHOLD: u32 = 100_000; +const RIB_LOOKBACK_HOURS: i64 = 24 * 30; +const UPDATES_LOOKAHEAD_HOURS: i64 = 2; + +type FullFeedAllowlists = HashMap>; + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[cfg_attr(feature = "cli", derive(ValueEnum))] +pub enum RibOutputType { + Sqlite, +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[cfg_attr(feature = "cli", derive(Args))] +pub struct RibFilters { + /// Target RIB timestamp. Repeat to request multiple snapshots. + #[cfg_attr(feature = "cli", clap(long = "ts", required = true))] + #[serde(default)] + pub rib_ts: Vec, + + /// Filter by origin AS Number(s), comma-separated. Prefix with ! to exclude. + #[cfg_attr(feature = "cli", clap(short = 'o', long, value_delimiter = ','))] + #[serde(default)] + pub origin_asn: Vec, + + /// Filter by origin ASN registration country. + #[cfg_attr(feature = "cli", clap(short = 'C', long))] + pub country: Option, + + /// Filter by network prefix(es), comma-separated. Prefix with ! to exclude. + #[cfg_attr(feature = "cli", clap(short = 'p', long, value_delimiter = ','))] + #[serde(default)] + pub prefix: Vec, + + /// Include super-prefixes when filtering. + #[cfg_attr(feature = "cli", clap(short = 's', long))] + #[serde(default)] + pub include_super: bool, + + /// Include sub-prefixes when filtering. + #[cfg_attr(feature = "cli", clap(short = 'S', long))] + #[serde(default)] + pub include_sub: bool, + + /// Filter by peer ASN(s), comma-separated. Prefix with ! to exclude. + #[cfg_attr(feature = "cli", clap(short = 'J', long, value_delimiter = ','))] + #[serde(default)] + pub peer_asn: Vec, + + /// Filter by AS path regex string. + #[cfg_attr(feature = "cli", clap(short = 'a', long))] + pub as_path: Option, + + /// Filter by collector, e.g., rrc00 or route-views2. + #[cfg_attr(feature = "cli", clap(short = 'c', long))] + pub collector: Option, + + /// Filter by route collection project, i.e. riperis or routeviews. + #[cfg_attr(feature = "cli", clap(short = 'P', long))] + pub project: Option, + + /// Keep only full-feed peers based on broker peer metadata. + #[cfg_attr(feature = "cli", clap(long))] + #[serde(default)] + pub full_feed_only: bool, +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[cfg_attr(feature = "cli", derive(Args))] +pub struct RibArgs { + #[cfg_attr(feature = "cli", clap(flatten))] + #[serde(flatten)] + pub filters: RibFilters, + + /// File output type. If omitted and `--output-dir` is also omitted, output goes to stdout. + #[cfg_attr(feature = "cli", clap(long, value_enum))] + pub output_type: Option, + + /// Output directory for generated SQLite files. + #[cfg_attr(feature = "cli", clap(long))] + pub output_dir: Option, +} + +impl RibArgs { + pub fn normalized_rib_ts(&self) -> Result> { + let time_lens = TimeLens::new(); + let mut timestamps = BTreeSet::new(); + + for value in &self.filters.rib_ts { + let ts = time_lens + .parse_time_string(value) + .map_err(|e| anyhow!("Invalid --ts value '{}': {}", value, e))? + .timestamp(); + timestamps.insert(ts); + } + + if timestamps.is_empty() { + return Err(anyhow!("At least one --ts value is required")); + } + + Ok(timestamps.into_iter().collect()) + } + + pub fn file_output_type(&self) -> Option { + match (self.output_type, self.output_dir.is_some()) { + (Some(output_type), _) => Some(output_type), + (None, true) => Some(RibOutputType::Sqlite), + (None, false) => None, + } + } + + pub fn validate(&self) -> Result> { + let normalized_ts = self.normalized_rib_ts()?; + + let parse_filters = ParseFilters { + origin_asn: self.filters.origin_asn.clone(), + prefix: self.filters.prefix.clone(), + include_super: self.filters.include_super, + include_sub: self.filters.include_sub, + peer_asn: self.filters.peer_asn.clone(), + as_path: self.filters.as_path.clone(), + ..Default::default() + }; + parse_filters.validate()?; + + if let Some(as_path) = &self.filters.as_path { + Regex::new(as_path) + .map_err(|e| anyhow!("Invalid --as-path regex '{}': {}", as_path, e))?; + } + + if normalized_ts.len() > 1 && self.file_output_type().is_none() { + return Err(anyhow!( + "Multiple --ts values require file output. Use --output-type and optionally --output-dir." + )); + } + + Ok(normalized_ts) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RibRow { + pub rib_ts: i64, + pub timestamp: f64, + pub collector: String, + pub peer_ip: String, + pub peer_asn: u32, + pub prefix: String, + pub path_id: Option, + pub as_path: Option, + pub origin_asns: Option, + pub origin: Option, + pub next_hop: Option, + pub local_pref: Option, + pub med: Option, + pub communities: Option, + pub atomic: bool, + pub aggr_asn: Option, + pub aggr_ip: Option, +} + +impl RibRow { + pub fn from_entry(rib_ts: i64, entry: &StoredRibEntry) -> Self { + Self { + rib_ts, + timestamp: entry.elem.timestamp, + collector: entry.collector.clone(), + peer_ip: entry.elem.peer_ip.to_string(), + peer_asn: entry.elem.peer_asn.to_u32(), + prefix: entry.elem.prefix.prefix.to_string(), + path_id: entry.elem.prefix.path_id, + as_path: entry.elem.as_path.as_ref().map(|path| path.to_string()), + origin_asns: entry.elem.origin_asns.as_ref().map(|asns| { + asns.iter() + .map(|asn| asn.to_string()) + .collect::>() + .join(" ") + }), + origin: entry.elem.origin.as_ref().map(|origin| origin.to_string()), + next_hop: entry.elem.next_hop.as_ref().map(|hop| hop.to_string()), + local_pref: entry.elem.local_pref, + med: entry.elem.med, + communities: entry.elem.communities.as_ref().map(|communities| { + communities + .iter() + .map(|community| community.to_string()) + .collect::>() + .join(" ") + }), + atomic: entry.elem.atomic, + aggr_asn: entry.elem.aggr_asn.map(|asn| asn.to_u32()), + aggr_ip: entry.elem.aggr_ip.as_ref().map(|ip| ip.to_string()), + } + } +} + +#[derive(Debug, Clone)] +pub struct RibRunSummary { + pub rib_ts: Vec, + pub collectors_processed: usize, + pub groups_processed: usize, +} + +#[derive(Debug, Clone)] +struct RibReplayGroup { + collector: String, + rib_item: BrokerItem, + rib_ts: Vec, + updates: Vec, +} + +#[derive(Debug, Clone)] +enum DeltaOp { + Upsert(StoredRibEntry), + Delete(RibRouteKey), +} + +#[derive(Debug, Clone)] +struct OriginFilter { + values: HashSet, + negated: bool, +} + +pub struct RibLens<'a> { + db: &'a MonocleDatabase, + config: &'a MonocleConfig, +} + +impl<'a> RibLens<'a> { + pub fn new(db: &'a MonocleDatabase, config: &'a MonocleConfig) -> Self { + Self { db, config } + } + + pub fn reconstruct_snapshots( + &self, + args: &RibArgs, + no_update: bool, + mut snapshot_visitor: F, + ) -> Result + where + F: FnMut(i64, &RibStateStore) -> Result<()>, + { + let normalized_ts = args.validate()?; + let country_asns = self.resolve_country_asns(args.filters.country.as_deref(), no_update)?; + let origin_filter = Self::parse_origin_filter(&args.filters.origin_asn)?; + let as_path_regex = Self::compile_as_path_regex(args.filters.as_path.as_deref())?; + let groups = self.resolve_replay_groups(args, &normalized_ts)?; + + let allowlists = if args.filters.full_feed_only { + self.build_full_feed_allowlists(&groups)? + } else { + HashMap::new() + }; + + for group in &groups { + let state_store = RibStateStore::new_temp()?; + let safe_base_filters = self.safe_parse_filters( + args, + group.rib_item.ts_start.and_utc().timestamp(), + group.rib_item.ts_end.and_utc().timestamp(), + ); + + self.load_base_rib( + &state_store, + &group.collector, + &group.rib_item, + &safe_base_filters, + country_asns.as_ref(), + origin_filter.as_ref(), + as_path_regex.as_ref(), + allowlists.get(group.collector.as_str()), + )?; + + self.replay_updates( + &state_store, + group, + args, + country_asns.as_ref(), + origin_filter.as_ref(), + as_path_regex.as_ref(), + allowlists.get(group.collector.as_str()), + &mut snapshot_visitor, + )?; + } + + let collector_count = groups + .iter() + .map(|group| group.collector.as_str()) + .collect::>() + .len(); + + Ok(RibRunSummary { + rib_ts: normalized_ts, + collectors_processed: collector_count, + groups_processed: groups.len(), + }) + } + + pub fn output_directory(&self, args: &RibArgs) -> Result> { + match args.file_output_type() { + None => Ok(None), + Some(_) => { + let dir = match &args.output_dir { + Some(path) => std::path::PathBuf::from(path), + None => std::env::current_dir() + .map_err(|e| anyhow!("Failed to determine current directory: {}", e))?, + }; + Ok(Some(dir)) + } + } + } + + pub fn file_name_prefix(&self, args: &RibArgs, rib_ts: &[i64]) -> Result { + let base = if rib_ts.len() == 1 { + format!( + "monocle-rib-{}", + Self::format_rib_ts_for_filename(rib_ts[0])? + ) + } else { + format!( + "monocle-rib-{}-{}", + Self::format_rib_ts_for_filename( + *rib_ts + .first() + .ok_or_else(|| anyhow!("missing first rib_ts"))? + )?, + Self::format_rib_ts_for_filename( + *rib_ts + .last() + .ok_or_else(|| anyhow!("missing last rib_ts"))? + )?, + ) + }; + + let slug = self.filter_slug(&args.filters)?; + if slug.is_empty() { + Ok(base) + } else { + Ok(format!("{}-{}", base, slug)) + } + } + + pub fn single_snapshot_file_name( + &self, + args: &RibArgs, + rib_ts: i64, + output_type: RibOutputType, + ) -> Result { + let prefix = self.file_name_prefix(args, &[rib_ts])?; + let ext = match output_type { + RibOutputType::Sqlite => "sqlite3", + }; + Ok(format!("{}.{}", prefix, ext)) + } + + fn resolve_country_asns( + &self, + country: Option<&str>, + no_update: bool, + ) -> Result>> { + let Some(country) = country else { + return Ok(None); + }; + + let country_code = self.resolve_country_code(country)?; + let asinfo = self.db.asinfo(); + + if asinfo.is_empty() { + if no_update { + return Err(anyhow!( + "ASInfo data is empty but --country was requested. Re-run without --no-update or refresh ASInfo first." + )); + } + self.db + .refresh_asinfo() + .map_err(|e| anyhow!("Failed to refresh ASInfo data for country filter: {}", e))?; + } else if !no_update && asinfo.needs_refresh(self.config.asinfo_cache_ttl()) { + self.db.refresh_asinfo().map_err(|e| { + anyhow!( + "Failed to refresh stale ASInfo data for country filter: {}", + e + ) + })?; + } + + let mut asns = HashSet::new(); + let mut stmt = self + .db + .connection() + .prepare("SELECT asn FROM asinfo_core WHERE UPPER(country) = UPPER(?1) ORDER BY asn") + .map_err(|e| anyhow!("Failed to prepare ASInfo country lookup: {}", e))?; + let rows = stmt + .query_map([country_code.clone()], |row| row.get::<_, u32>(0)) + .map_err(|e| { + anyhow!( + "Failed to query ASInfo by country '{}': {}", + country_code, + e + ) + })?; + + for row in rows { + asns.insert(row.map_err(|e| anyhow!("Failed to decode ASInfo country row: {}", e))?); + } + + Ok(Some(asns)) + } + + fn resolve_country_code(&self, input: &str) -> Result { + let lens = CountryLens::new(); + let matches = lens.lookup(input); + + if matches.is_empty() { + if input.len() == 2 { + return Ok(input.to_uppercase()); + } + return Err(anyhow!("Unknown country filter '{}'", input)); + } + + let exact_name_matches: Vec<_> = matches + .iter() + .filter(|entry| entry.name.eq_ignore_ascii_case(input)) + .collect(); + if exact_name_matches.len() == 1 { + return Ok(exact_name_matches[0].code.clone()); + } + + let exact_code_matches: Vec<_> = matches + .iter() + .filter(|entry| entry.code.eq_ignore_ascii_case(input)) + .collect(); + if exact_code_matches.len() == 1 { + return Ok(exact_code_matches[0].code.clone()); + } + + if matches.len() == 1 { + return Ok(matches[0].code.clone()); + } + + Err(anyhow!( + "Country filter '{}' is ambiguous; matches: {}", + input, + matches + .iter() + .map(|entry| format!("{} ({})", entry.name, entry.code)) + .collect::>() + .join(", ") + )) + } + + fn parse_origin_filter(values: &[String]) -> Result> { + if values.is_empty() { + return Ok(None); + } + + let negated = values + .first() + .map(|value| value.starts_with('!')) + .unwrap_or(false); + let mut parsed = HashSet::new(); + + for value in values { + let asn = value + .trim_start_matches('!') + .parse::() + .map_err(|e| anyhow!("Invalid origin ASN filter '{}': {}", value, e))?; + parsed.insert(asn); + } + + Ok(Some(OriginFilter { + values: parsed, + negated, + })) + } + + fn compile_as_path_regex(pattern: Option<&str>) -> Result> { + pattern + .map(|pattern| { + Regex::new(pattern) + .map_err(|e| anyhow!("Invalid --as-path regex '{}': {}", pattern, e)) + }) + .transpose() + } + + fn resolve_replay_groups( + &self, + args: &RibArgs, + normalized_ts: &[i64], + ) -> Result> { + let first_ts = *normalized_ts + .first() + .ok_or_else(|| anyhow!("Missing earliest rib_ts after validation"))?; + let last_ts = *normalized_ts + .last() + .ok_or_else(|| anyhow!("Missing latest rib_ts after validation"))?; + + let ribs = self + .base_broker(args) + .data_type("rib") + .ts_start(Self::timestamp_to_broker_string( + first_ts - Duration::hours(RIB_LOOKBACK_HOURS).num_seconds(), + )?) + .ts_end(Self::timestamp_to_broker_string(last_ts)?) + .query() + .map_err(|e| anyhow!("Failed to query broker for candidate RIB files: {}", e))?; + + let mut ribs_by_collector: BTreeMap> = BTreeMap::new(); + for item in ribs { + ribs_by_collector + .entry(item.collector_id.clone()) + .or_default() + .push(item); + } + + let mut groups = Vec::new(); + for (collector, mut collector_ribs) in ribs_by_collector { + collector_ribs.sort_by_key(|item| item.ts_start); + + let mut timestamps_by_rib: BTreeMap)> = BTreeMap::new(); + for rib_ts in normalized_ts { + let selected_rib = collector_ribs + .iter() + .filter(|item| item.ts_start.and_utc().timestamp() <= *rib_ts) + .max_by_key(|item| item.ts_start); + + let Some(selected_rib) = selected_rib else { + return Err(anyhow!( + "No RIB file found at or before {} for collector {}", + Self::format_rib_ts_for_error(*rib_ts)?, + collector + )); + }; + + timestamps_by_rib + .entry(selected_rib.url.clone()) + .and_modify(|(_, timestamps)| timestamps.push(*rib_ts)) + .or_insert_with(|| (selected_rib.clone(), vec![*rib_ts])); + } + + for (_, (rib_item, mut group_ts)) in timestamps_by_rib { + group_ts.sort_unstable(); + let group_max_ts = *group_ts + .last() + .ok_or_else(|| anyhow!("Replay group was created without any rib_ts"))?; + let updates = + self.resolve_group_updates(args, &collector, &rib_item, group_max_ts)?; + + groups.push(RibReplayGroup { + collector: collector.clone(), + rib_item, + rib_ts: group_ts, + updates, + }); + } + } + + groups.sort_by(|a, b| { + a.collector + .cmp(&b.collector) + .then(a.rib_item.ts_start.cmp(&b.rib_item.ts_start)) + }); + + if groups.is_empty() { + return Err(anyhow!( + "No suitable RIB files were found for the requested timestamps and collector filters." + )); + } + + Ok(groups) + } + + fn resolve_group_updates( + &self, + args: &RibArgs, + collector: &str, + rib_item: &BrokerItem, + group_max_ts: i64, + ) -> Result> { + let rib_ts = rib_item.ts_start.and_utc().timestamp(); + let query_end = group_max_ts + Duration::hours(UPDATES_LOOKAHEAD_HOURS).num_seconds(); + + let mut broker = self + .base_broker(args) + .collector_id(collector) + .data_type("updates") + .ts_start(Self::timestamp_to_broker_string(rib_ts)?) + .ts_end(Self::timestamp_to_broker_string(query_end)?); + + if let Some(project) = &args.filters.project { + broker = broker.project(project); + } + + let mut updates = broker.query().map_err(|e| { + anyhow!( + "Failed to query broker for updates for {}: {}", + collector, + e + ) + })?; + + updates.retain(|item| { + let item_start = item.ts_start.and_utc().timestamp(); + let item_end = item.ts_end.and_utc().timestamp(); + item_start <= group_max_ts && item_end > rib_ts + }); + updates.sort_by_key(|item| item.ts_start); + Ok(updates) + } + + fn build_full_feed_allowlists(&self, groups: &[RibReplayGroup]) -> Result { + let mut allowlists = HashMap::new(); + + for collector in groups + .iter() + .map(|group| group.collector.as_str()) + .collect::>() + { + let peers = BgpkitBroker::new() + .collector_id(collector) + .get_peers() + .map_err(|e| { + anyhow!( + "Failed to fetch broker peer metadata for {}: {}", + collector, + e + ) + })?; + + let allowed = peers + .into_iter() + .filter(|peer| { + peer.num_v4_pfxs >= FULL_FEED_V4_THRESHOLD + || peer.num_v6_pfxs >= FULL_FEED_V6_THRESHOLD + }) + .map(|peer| (peer.ip.to_string(), peer.asn)) + .collect::>(); + + allowlists.insert(collector.to_string(), allowed); + } + + Ok(allowlists) + } + + #[allow(clippy::too_many_arguments)] + fn load_base_rib( + &self, + state_store: &RibStateStore, + collector: &str, + rib_item: &BrokerItem, + safe_filters: &ParseFilters, + country_asns: Option<&HashSet>, + origin_filter: Option<&OriginFilter>, + as_path_regex: Option<&Regex>, + full_feed_allowlist: Option<&HashSet<(String, u32)>>, + ) -> Result<()> { + let parser = safe_filters.to_parser(&rib_item.url).map_err(|e| { + anyhow!( + "Failed to build parser for base RIB {}: {}", + rib_item.url, + e + ) + })?; + + let mut batch = Vec::new(); + for elem in parser { + if elem.elem_type != ElemType::ANNOUNCE { + continue; + } + if self.announce_matches( + collector, + &elem, + country_asns, + origin_filter, + as_path_regex, + full_feed_allowlist, + ) { + batch.push(StoredRibEntry::new(collector.to_string(), elem)); + } + } + + state_store.upsert_entries(&batch)?; + Ok(()) + } + + #[allow(clippy::too_many_arguments)] + fn replay_updates( + &self, + state_store: &RibStateStore, + group: &RibReplayGroup, + args: &RibArgs, + country_asns: Option<&HashSet>, + origin_filter: Option<&OriginFilter>, + as_path_regex: Option<&Regex>, + full_feed_allowlist: Option<&HashSet<(String, u32)>>, + snapshot_visitor: &mut F, + ) -> Result<()> + where + F: FnMut(i64, &RibStateStore) -> Result<()>, + { + let mut pending = HashMap::::new(); + let mut next_snapshot_index = 0usize; + + for update in &group.updates { + let safe_filters = self.safe_parse_filters( + args, + group.rib_item.ts_start.and_utc().timestamp(), + *group + .rib_ts + .last() + .ok_or_else(|| anyhow!("Replay group missing max rib_ts"))?, + ); + let parser = safe_filters.to_parser(&update.url).map_err(|e| { + anyhow!( + "Failed to build parser for updates file {}: {}", + update.url, + e + ) + })?; + + for elem in parser { + while next_snapshot_index < group.rib_ts.len() + && elem.timestamp > group.rib_ts[next_snapshot_index] as f64 + { + self.flush_pending(state_store, &mut pending)?; + snapshot_visitor(group.rib_ts[next_snapshot_index], state_store)?; + next_snapshot_index += 1; + } + + self.apply_update_to_delta( + &mut pending, + state_store, + &group.collector, + elem, + country_asns, + origin_filter, + as_path_regex, + full_feed_allowlist, + )?; + } + } + + while next_snapshot_index < group.rib_ts.len() { + self.flush_pending(state_store, &mut pending)?; + snapshot_visitor(group.rib_ts[next_snapshot_index], state_store)?; + next_snapshot_index += 1; + } + + Ok(()) + } + + #[allow(clippy::too_many_arguments)] + fn apply_update_to_delta( + &self, + pending: &mut HashMap, + state_store: &RibStateStore, + collector: &str, + elem: BgpElem, + country_asns: Option<&HashSet>, + origin_filter: Option<&OriginFilter>, + as_path_regex: Option<&Regex>, + full_feed_allowlist: Option<&HashSet<(String, u32)>>, + ) -> Result<()> { + let route_key = RibRouteKey::from_elem(collector, &elem); + + match elem.elem_type { + ElemType::WITHDRAW => { + if self.route_exists_in_state_or_delta(&route_key, state_store, pending)? { + pending.insert(route_key.clone(), DeltaOp::Delete(route_key)); + } + } + ElemType::ANNOUNCE => { + let matches = self.announce_matches( + collector, + &elem, + country_asns, + origin_filter, + as_path_regex, + full_feed_allowlist, + ); + + if matches { + pending.insert( + route_key, + DeltaOp::Upsert(StoredRibEntry::new(collector.to_string(), elem)), + ); + } else if self.route_exists_in_state_or_delta(&route_key, state_store, pending)? { + pending.insert(route_key.clone(), DeltaOp::Delete(route_key)); + } + } + } + + Ok(()) + } + + fn route_exists_in_state_or_delta( + &self, + route_key: &RibRouteKey, + state_store: &RibStateStore, + pending: &HashMap, + ) -> Result { + if let Some(delta) = pending.get(route_key) { + return Ok(matches!(delta, DeltaOp::Upsert(_))); + } + state_store.route_exists(route_key) + } + + fn flush_pending( + &self, + state_store: &RibStateStore, + pending: &mut HashMap, + ) -> Result<()> { + if pending.is_empty() { + return Ok(()); + } + + let mut upserts = Vec::new(); + let mut deletes = Vec::new(); + + for delta in pending.values() { + match delta { + DeltaOp::Upsert(entry) => upserts.push(entry.clone()), + DeltaOp::Delete(key) => deletes.push(key.clone()), + } + } + + if !upserts.is_empty() { + state_store.upsert_entries(&upserts)?; + } + if !deletes.is_empty() { + state_store.delete_keys(&deletes)?; + } + + pending.clear(); + Ok(()) + } + + fn announce_matches( + &self, + collector: &str, + elem: &BgpElem, + country_asns: Option<&HashSet>, + origin_filter: Option<&OriginFilter>, + as_path_regex: Option<&Regex>, + full_feed_allowlist: Option<&HashSet<(String, u32)>>, + ) -> bool { + if collector.is_empty() { + return false; + } + + if let Some(origin_filter) = origin_filter { + let matches_origin = elem + .origin_asns + .as_ref() + .map(|origins| { + origins + .iter() + .any(|asn| origin_filter.values.contains(&asn.to_u32())) + }) + .unwrap_or(false); + + if origin_filter.negated { + if matches_origin { + return false; + } + } else if !matches_origin { + return false; + } + } + + if let Some(country_asns) = country_asns { + let matches_country = elem + .origin_asns + .as_ref() + .map(|origins| { + origins + .iter() + .any(|asn| country_asns.contains(&asn.to_u32())) + }) + .unwrap_or(false); + if !matches_country { + return false; + } + } + + if let Some(as_path_regex) = as_path_regex { + let as_path = elem + .as_path + .as_ref() + .map(|path| path.to_string()) + .unwrap_or_default(); + if !as_path_regex.is_match(&as_path) { + return false; + } + } + + if let Some(full_feed_allowlist) = full_feed_allowlist { + let peer_key = (elem.peer_ip.to_string(), elem.peer_asn.to_u32()); + if !full_feed_allowlist.contains(&peer_key) { + return false; + } + } + + true + } + + fn safe_parse_filters(&self, args: &RibArgs, start_ts: i64, end_ts: i64) -> ParseFilters { + ParseFilters { + prefix: args.filters.prefix.clone(), + include_super: args.filters.include_super, + include_sub: args.filters.include_sub, + peer_asn: args.filters.peer_asn.clone(), + start_ts: Some(start_ts.to_string()), + end_ts: Some(end_ts.to_string()), + ..Default::default() + } + } + + fn base_broker(&self, args: &RibArgs) -> BgpkitBroker { + let mut broker = BgpkitBroker::new().page_size(1000); + if let Some(collector) = &args.filters.collector { + broker = broker.collector_id(collector); + } + if let Some(project) = &args.filters.project { + broker = broker.project(project); + } + broker + } + + fn timestamp_to_broker_string(ts: i64) -> Result { + let timestamp = DateTime::from_timestamp(ts, 0) + .ok_or_else(|| anyhow!("Invalid Unix timestamp {} for broker query", ts))?; + Ok(timestamp.format("%Y-%m-%dT%H:%M:%SZ").to_string()) + } + + fn format_rib_ts_for_filename(rib_ts: i64) -> Result { + let timestamp = DateTime::from_timestamp(rib_ts, 0) + .ok_or_else(|| anyhow!("Invalid Unix timestamp {} for file naming", rib_ts))?; + Ok(timestamp.format("%Y%m%dT%H%M%SZ").to_string()) + } + + fn format_rib_ts_for_error(rib_ts: i64) -> Result { + let timestamp = DateTime::from_timestamp(rib_ts, 0) + .ok_or_else(|| anyhow!("Invalid Unix timestamp {} for error reporting", rib_ts))?; + Ok(timestamp.format("%Y-%m-%dT%H:%M:%SZ").to_string()) + } + + fn filter_slug(&self, filters: &RibFilters) -> Result { + let mut parts = Vec::new(); + + if let Some(country) = &filters.country { + parts.push(format!( + "country-{}", + Self::sanitize_slug_component(country) + )); + } + if !filters.origin_asn.is_empty() { + parts.push(format!( + "origin-{}", + Self::sanitize_list_component(&filters.origin_asn) + )); + } + if !filters.peer_asn.is_empty() { + parts.push(format!( + "peer-{}", + Self::sanitize_list_component(&filters.peer_asn) + )); + } + if let Some(collector) = &filters.collector { + let values = collector + .split(',') + .map(|value| value.trim().to_string()) + .collect::>(); + parts.push(format!( + "collector-{}", + Self::sanitize_list_component(&values) + )); + } + if let Some(project) = &filters.project { + parts.push(format!( + "project-{}", + Self::sanitize_slug_component(project) + )); + } + if !filters.prefix.is_empty() { + parts.push(format!("prefix-{}", Self::hash8(&filters.prefix.join(",")))); + } + if let Some(as_path) = &filters.as_path { + parts.push(format!("aspath-{}", Self::hash8(as_path))); + } + if filters.full_feed_only { + parts.push("fullfeed".to_string()); + } + + let slug = parts.join("-"); + if slug.len() <= 96 { + return Ok(slug); + } + + let truncated = slug + .chars() + .take(80) + .collect::() + .trim_end_matches('-') + .to_string(); + Ok(format!("{}-h{}", truncated, Self::hash8(&slug))) + } + + fn sanitize_list_component(values: &[String]) -> String { + let mut normalized = values + .iter() + .map(|value| Self::sanitize_slug_component(value)) + .collect::>(); + normalized.sort(); + normalized.join("+") + } + + fn sanitize_slug_component(input: &str) -> String { + input + .to_ascii_lowercase() + .chars() + .map(|ch| if ch.is_ascii_alphanumeric() { ch } else { '_' }) + .collect::() + .trim_matches('_') + .to_string() + } + + fn hash8(input: &str) -> String { + let mut hash = 0xcbf29ce484222325_u64; + for byte in input.as_bytes() { + hash ^= u64::from(*byte); + hash = hash.wrapping_mul(0x100000001b3); + } + format!("{:08x}", hash & 0xffff_ffff) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn base_args() -> RibArgs { + RibArgs { + filters: RibFilters { + rib_ts: vec!["2025-09-01T12:00:00Z".to_string()], + ..Default::default() + }, + output_type: None, + output_dir: None, + } + } + + #[test] + fn test_validate_multi_ts_stdout_error() { + let mut args = base_args(); + args.filters.rib_ts.push("2025-09-01T13:00:00Z".to_string()); + assert!(args.validate().is_err()); + } + + #[test] + fn test_validate_multi_ts_file_output_ok() -> Result<()> { + let mut args = base_args(); + args.filters.rib_ts.push("2025-09-01T13:00:00Z".to_string()); + args.output_type = Some(RibOutputType::Sqlite); + let values = args.validate()?; + assert_eq!(values.len(), 2); + Ok(()) + } + + #[test] + fn test_filter_slug_order() -> Result<()> { + let mut args = base_args(); + args.filters.country = Some("IR".to_string()); + args.filters.origin_asn = vec!["15169".to_string(), "13335".to_string()]; + args.filters.peer_asn = vec!["2914".to_string()]; + args.filters.collector = Some("rrc00,route-views2".to_string()); + args.filters.project = Some("riperis".to_string()); + args.filters.prefix = vec!["1.1.1.0/24".to_string()]; + args.filters.as_path = Some("^15169 ".to_string()); + args.filters.full_feed_only = true; + + let db = MonocleDatabase::open_in_memory()?; + let config = MonocleConfig::default(); + let lens = RibLens::new(&db, &config); + let slug = lens.filter_slug(&args.filters)?; + + assert!(slug + .starts_with("country-ir-origin-13335+15169-peer-2914-collector-route_views2+rrc00")); + assert!(slug.contains("-h")); + Ok(()) + } + + #[test] + fn test_hash8_is_stable() { + assert_eq!(RibLens::hash8("a"), RibLens::hash8("a")); + } + + #[test] + fn test_single_snapshot_file_name_includes_filters() -> Result<()> { + let mut args = base_args(); + args.filters.country = Some("US".to_string()); + args.filters.origin_asn = vec!["13335".to_string()]; + args.filters.full_feed_only = true; + + let db = MonocleDatabase::open_in_memory()?; + let config = MonocleConfig::default(); + let lens = RibLens::new(&db, &config); + let file_name = + lens.single_snapshot_file_name(&args, 1_756_728_000, RibOutputType::Sqlite)?; + + assert_eq!( + file_name, + "monocle-rib-20250901T120000Z-country-us-origin-13335-fullfeed.sqlite3" + ); + Ok(()) + } +} From 018906c994d3d4c107c12f866e66597c1c1bee3f Mon Sep 17 00:00:00 2001 From: Mingwei Zhang Date: Sun, 15 Mar 2026 12:15:18 -0700 Subject: [PATCH 2/2] fix: infer rib sqlite output from sqlite path - remove the rib output-type parameter and use --sqlite-path to enable SQLite output - require --sqlite-path for multi-timestamp rib runs - update README and changelog for the revised rib CLI --- CHANGELOG.md | 5 ++- README.md | 12 +++---- src/bin/commands/rib.rs | 38 ++++++++-------------- src/lens/rib/mod.rs | 71 ++++++++--------------------------------- 4 files changed, 34 insertions(+), 92 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 861a05c..c2bc973 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,11 +8,10 @@ All notable changes to this project will be documented in this file. * Added `monocle rib` for reconstructing RIB state at arbitrary timestamps * Selects the latest RIB before each requested `rib_ts` and replays updates to the exact timestamp - * Supports stdout output by default and merged SQLite output - * Repeated `--ts` values are written to one merged SQLite file keyed by `rib_ts` + * Supports stdout output by default and SQLite output via `--sqlite-path` + * Repeated `--ts` values require `--sqlite-path` and are written to one merged SQLite file keyed by `rib_ts` * Aborts when no RIB exists at or before a requested `rib_ts` for a selected collector * Supports `--country`, `--origin-asn`, `--prefix`, `--as-path`, `--peer-asn`, `--collector`, `--project`, and `--full-feed-only` - * Auto-generated output filenames include requested timestamps and normalized filter slugs ### Code Improvements diff --git a/README.md b/README.md index 946af0d..62eecfd 100644 --- a/README.md +++ b/README.md @@ -730,8 +730,7 @@ Options: -c, --collector Filter by collector, e.g., rrc00 or route-views2 -P, --project Filter by route collection project, i.e. riperis or routeviews --full-feed-only Keep only full-feed peers based on broker peer metadata - --output-type File output type. If omitted and `--output-dir` is also omitted, output goes to stdout [possible values: sqlite] - --output-dir Output directory for generated SQLite files + --sqlite-path SQLite output file path -h, --help Print help -V, --version Print version ``` @@ -739,7 +738,8 @@ Options: Behavior: - A single `--ts` writes to stdout by default. -- Repeated `--ts` values require file output and are written to one merged SQLite file keyed by `rib_ts`. +- Repeated `--ts` values require `--sqlite-path` and are written to one merged SQLite file keyed by `rib_ts`. +- Providing `--sqlite-path` writes the reconstructed results to that SQLite file instead of stdout. - If any selected collector has no RIB at or before a requested `rib_ts`, the command aborts instead of producing a partial result. - `--country` uses local ASInfo registration data, and `--full-feed-only` keeps only peers with at least 800k IPv4 prefixes or 100k IPv6 prefixes in broker peer metadata. @@ -753,13 +753,13 @@ monocle rib --ts 2025-09-01T12:00:00Z -c rrc00 -o 13335 monocle rib \ --ts 2025-09-01T12:00:00Z \ --ts 2025-09-01T18:00:00Z \ - --output-type sqlite \ + --sqlite-path /tmp/rrc00-us.sqlite3 \ -c rrc00 \ --country US \ --full-feed-only -# Override the output directory -monocle rib --ts 2025-09-01T12:00:00Z --output-dir /tmp/rib-out -c route-views2 +# Write a single reconstructed snapshot to SQLite +monocle rib --ts 2025-09-01T12:00:00Z --sqlite-path /tmp/route-views2.sqlite3 -c route-views2 ``` ### `monocle time` diff --git a/src/bin/commands/rib.rs b/src/bin/commands/rib.rs index 1aef1e8..b1c2b5b 100644 --- a/src/bin/commands/rib.rs +++ b/src/bin/commands/rib.rs @@ -1,12 +1,12 @@ use std::fs; use std::io::Write; -use std::path::{Path, PathBuf}; +use std::path::Path; use anyhow::{anyhow, Result}; use bgpkit_parser::BgpElem; use monocle::database::{MonocleDatabase, RibSqliteStore}; -use monocle::lens::rib::{RibLens, RibOutputType}; +use monocle::lens::rib::RibLens; use monocle::utils::{OutputFormat, TimestampFormat}; use monocle::MonocleConfig; @@ -52,9 +52,10 @@ fn run_inner( .map_err(|e| anyhow!("Failed to open database '{}': {}", sqlite_path, e))?; let lens = RibLens::new(&db, config); - match args.file_output_type() { - None => run_stdout(&lens, &args, output_format, no_update), - Some(RibOutputType::Sqlite) => run_sqlite_output(&lens, &args, no_update), + if args.sqlite_path.is_some() { + run_sqlite_output(&lens, &args, no_update) + } else { + run_stdout(&lens, &args, output_format, no_update) } } @@ -115,16 +116,15 @@ fn run_stdout( } fn run_sqlite_output(lens: &RibLens<'_>, args: &RibArgs, no_update: bool) -> Result<()> { - let normalized_ts = args.validate()?; - let output_dir = ensure_output_dir(lens.output_directory(args)?)?; - let output_path = output_dir.join(format!( - "{}.sqlite3", - lens.file_name_prefix(args, &normalized_ts)? - )); + args.validate()?; + let output_path = args + .sqlite_path + .as_deref() + .ok_or_else(|| anyhow!("Missing --sqlite-path for SQLite output"))?; - remove_existing_file(&output_path)?; + remove_existing_file(output_path)?; - let sqlite_store = RibSqliteStore::new(path_to_str(&output_path)?, true)?; + let sqlite_store = RibSqliteStore::new(path_to_str(output_path)?, true)?; let summary = lens.reconstruct_snapshots(args, no_update, |rib_ts, state_store| { state_store.visit_entries(|entry| sqlite_store.insert_entry(rib_ts, &entry)) })?; @@ -137,18 +137,6 @@ fn run_sqlite_output(lens: &RibLens<'_>, args: &RibArgs, no_update: bool) -> Res Ok(()) } -fn ensure_output_dir(path: Option) -> Result { - let output_dir = path.ok_or_else(|| anyhow!("Failed to resolve output directory"))?; - fs::create_dir_all(&output_dir).map_err(|e| { - anyhow!( - "Failed to create output directory '{}': {}", - output_dir.display(), - e - ) - })?; - Ok(output_dir) -} - fn remove_existing_file(path: &Path) -> Result<()> { match fs::remove_file(path) { Ok(()) => Ok(()), diff --git a/src/lens/rib/mod.rs b/src/lens/rib/mod.rs index f40b923..11f7b83 100644 --- a/src/lens/rib/mod.rs +++ b/src/lens/rib/mod.rs @@ -6,6 +6,7 @@ //! 3. Materializing only the final route state for each requested `rib_ts` use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; +use std::path::PathBuf; use anyhow::{anyhow, Result}; use bgpkit_broker::{BgpkitBroker, BrokerItem}; @@ -22,7 +23,7 @@ use crate::lens::parse::ParseFilters; use crate::lens::time::TimeLens; #[cfg(feature = "cli")] -use clap::{Args, ValueEnum}; +use clap::Args; const FULL_FEED_V4_THRESHOLD: u32 = 800_000; const FULL_FEED_V6_THRESHOLD: u32 = 100_000; @@ -31,12 +32,6 @@ const UPDATES_LOOKAHEAD_HOURS: i64 = 2; type FullFeedAllowlists = HashMap>; -#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] -#[cfg_attr(feature = "cli", derive(ValueEnum))] -pub enum RibOutputType { - Sqlite, -} - #[derive(Debug, Clone, Default, Serialize, Deserialize)] #[cfg_attr(feature = "cli", derive(Args))] pub struct RibFilters { @@ -99,13 +94,9 @@ pub struct RibArgs { #[serde(flatten)] pub filters: RibFilters, - /// File output type. If omitted and `--output-dir` is also omitted, output goes to stdout. - #[cfg_attr(feature = "cli", clap(long, value_enum))] - pub output_type: Option, - - /// Output directory for generated SQLite files. + /// SQLite output file path. #[cfg_attr(feature = "cli", clap(long))] - pub output_dir: Option, + pub sqlite_path: Option, } impl RibArgs { @@ -128,14 +119,6 @@ impl RibArgs { Ok(timestamps.into_iter().collect()) } - pub fn file_output_type(&self) -> Option { - match (self.output_type, self.output_dir.is_some()) { - (Some(output_type), _) => Some(output_type), - (None, true) => Some(RibOutputType::Sqlite), - (None, false) => None, - } - } - pub fn validate(&self) -> Result> { let normalized_ts = self.normalized_rib_ts()?; @@ -155,10 +138,8 @@ impl RibArgs { .map_err(|e| anyhow!("Invalid --as-path regex '{}': {}", as_path, e))?; } - if normalized_ts.len() > 1 && self.file_output_type().is_none() { - return Err(anyhow!( - "Multiple --ts values require file output. Use --output-type and optionally --output-dir." - )); + if normalized_ts.len() > 1 && self.sqlite_path.is_none() { + return Err(anyhow!("Multiple --ts values require --sqlite-path.")); } Ok(normalized_ts) @@ -323,20 +304,6 @@ impl<'a> RibLens<'a> { }) } - pub fn output_directory(&self, args: &RibArgs) -> Result> { - match args.file_output_type() { - None => Ok(None), - Some(_) => { - let dir = match &args.output_dir { - Some(path) => std::path::PathBuf::from(path), - None => std::env::current_dir() - .map_err(|e| anyhow!("Failed to determine current directory: {}", e))?, - }; - Ok(Some(dir)) - } - } - } - pub fn file_name_prefix(&self, args: &RibArgs, rib_ts: &[i64]) -> Result { let base = if rib_ts.len() == 1 { format!( @@ -367,19 +334,6 @@ impl<'a> RibLens<'a> { } } - pub fn single_snapshot_file_name( - &self, - args: &RibArgs, - rib_ts: i64, - output_type: RibOutputType, - ) -> Result { - let prefix = self.file_name_prefix(args, &[rib_ts])?; - let ext = match output_type { - RibOutputType::Sqlite => "sqlite3", - }; - Ok(format!("{}.{}", prefix, ext)) - } - fn resolve_country_asns( &self, country: Option<&str>, @@ -1071,8 +1025,7 @@ mod tests { rib_ts: vec!["2025-09-01T12:00:00Z".to_string()], ..Default::default() }, - output_type: None, - output_dir: None, + sqlite_path: None, } } @@ -1087,7 +1040,7 @@ mod tests { fn test_validate_multi_ts_file_output_ok() -> Result<()> { let mut args = base_args(); args.filters.rib_ts.push("2025-09-01T13:00:00Z".to_string()); - args.output_type = Some(RibOutputType::Sqlite); + args.sqlite_path = Some(PathBuf::from("/tmp/monocle-rib.sqlite3")); let values = args.validate()?; assert_eq!(values.len(), 2); Ok(()) @@ -1122,7 +1075,7 @@ mod tests { } #[test] - fn test_single_snapshot_file_name_includes_filters() -> Result<()> { + fn test_file_name_prefix_includes_filters() -> Result<()> { let mut args = base_args(); args.filters.country = Some("US".to_string()); args.filters.origin_asn = vec!["13335".to_string()]; @@ -1131,8 +1084,10 @@ mod tests { let db = MonocleDatabase::open_in_memory()?; let config = MonocleConfig::default(); let lens = RibLens::new(&db, &config); - let file_name = - lens.single_snapshot_file_name(&args, 1_756_728_000, RibOutputType::Sqlite)?; + let file_name = format!( + "{}.sqlite3", + lens.file_name_prefix(&args, &[1_756_728_000])? + ); assert_eq!( file_name,