diff --git a/.gitignore b/.gitignore index 01d8dbd..9709172 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,6 @@ venv .idea *.bz2 +*.gz +*.json Cargo.lock diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..48d1bfc --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,208 @@ +# Agentic Coding Guidelines for peer-stats + +This document provides guidelines for AI agents working on the peer-stats codebase. + +## Build, Test, and Lint Commands + +### Essential Commands +```bash +# Format code (run after any code changes) +cargo fmt + +# Build the project +cargo build + +# Build with all features +cargo build --all-features + +# Run all tests +cargo test + +# Run a specific test +cargo test test_dedup # by function name +cargo test tests::test_dedup # full path + +# Run clippy (must pass with zero warnings) +cargo clippy --all-features -- -D warnings + +# Full verification (run before committing) +cargo fmt && cargo build && cargo test && cargo clippy --all-features -- -D warnings +``` + +### Binaries +The project includes 5 binaries defined in `Cargo.toml`: +- `peer-stats-single-file` - single file processor +- `peer-stats-bootstrap` - bootstrap operation +- `peer-stats-index` - index peer stats +- `as2rel-index` - index AS relationships +- `pfx2as-index` - index prefix-to-AS mappings + +Build specific binary: `cargo build --bin peer-stats-single-file` + +## Code Style Guidelines + +### General Principles +- Keep modules focused on single concerns +- Use processor pattern: `new()` → `process_*()` → `into_*()` +- Prefer composition over inheritance +- Keep public API surface minimal - don't expose internal implementation details + +### Formatting +- Use `cargo fmt` to auto-format code +- Maximum line length: standard Rust (100 chars soft limit) +- Use 4 spaces for indentation +- Group imports: stdlib, external crates, then internal modules + +### Imports +Order imports as follows: +1. Standard library imports +2. External crate imports (alphabetically) +3. Internal crate imports + +Example: +```rust +use std::collections::{HashMap, HashSet}; +use std::net::IpAddr; + +use ipnet::IpNet; +use serde::{Deserialize, Serialize}; + +use crate::types::SomeType; +``` + +### Naming Conventions +- **Structs**: PascalCase (e.g., `PeerStatsProcessor`, `As2RelCount`) +- **Functions/Methods**: snake_case (e.g., `process_path`, `into_peer_info`) +- **Constants**: SCREAMING_SNAKE_CASE (e.g., `TIER1`, `TIER1_V4`) +- **Variables**: snake_case (e.g., `peer_ip`, `as_path`) +- **Type parameters**: PascalCase, single letters preferred + +### Types +- Use explicit types for public API +- Prefer `u32` for ASN values +- Use `usize` for counts and indices +- Use `Option` for nullable values +- Use `Result` for fallible operations (prefer `anyhow::Result`) + +### Error Handling +- Use `anyhow::Result` for most operations +- Use `?` operator for error propagation +- Avoid unwrap/expect in production code; use proper error handling +- Add context to errors when crossing module boundaries + +### Module Structure +Each processor module should contain: +1. Types (structs with derives) +2. Processor struct with: + - `new()` constructor + - `process_*()` methods for accumulating data + - `into_*()` method for finalizing results + - `Default` implementation + +Example structure: +```rust +// 1. Public types +#[derive(Debug, Clone, Serialize)] +pub struct SomeInfo { ... } + +// 2. Processor struct +pub struct SomeProcessor { ... } + +impl SomeProcessor { + pub fn new() -> Self { ... } + pub fn process_element(&mut self, ...) { ... } + pub fn into_results(self, ...) -> SomeInfo { ... } +} + +impl Default for SomeProcessor { + fn default() -> Self { Self::new() } +} +``` + +### Documentation +- Add doc comments (`///`) for all public items +- Document struct fields with `///` +- Use proper grammar in documentation +- Keep doc comments factual and concise + +### Serializing/Deserializing +Use derives in this order: `#[derive(Debug, Clone, Serialize)]` or `#[derive(Debug, Clone, Serialize, Deserialize)]` + +### Testing +- Place tests in `#[cfg(test)]` module at end of file +- Use descriptive test names: `test_dedup`, `test_read_rib` +- Include edge cases (empty collections, single items) +- Use `tracing` for logging in tests if needed + +### Constants +- Define at module level, before types +- Use arrays with explicit sizes: `[u32; 16]` not `[u32]` +- Document what the constants represent + +### Version Control +- Don't commit test output files (*.json) +- Run full verification before committing +- Keep commits atomic (one logical change per commit) +- Use present tense in commit messages (e.g., "Add feature", not "Added feature") + +### Dependencies +Key dependencies to know: +- `bgpkit-parser` - BGP/MRT parsing +- `serde` / `serde_json` - Serialization +- `anyhow` - Error handling +- `ipnet` - IP network types +- `clap` - CLI argument parsing +- `rusqlite` - SQLite database +- `tracing` - Logging + +### CI/CD Compliance +The GitHub Actions workflow (`.github/workflows/build.yml`) runs: +1. `cargo build --verbose` +2. `cargo clippy --all-features -- -D warnings` + +All PRs must pass these checks with zero warnings. + +## Project Structure +``` +src/ +├── lib.rs # Main library with orchestration logic +├── as2rel.rs # AS relationship processing +├── peer_stats.rs # Peer statistics processing +├── pfx2as.rs # Prefix-to-AS mapping +└── bin/ # CLI binaries + ├── single-file.rs + ├── bootstrap.rs + ├── index-peer-stats.rs + ├── index-as2rel.rs + └── index-pfx2as.rs +``` + +## Common Patterns + +### Processor Pattern +```rust +let mut processor = SomeProcessor::new(); +for item in items { + processor.process_element(item); +} +let result = processor.into_results(...); +``` + +### Error Propagation +```rust +use anyhow::Result; + +pub fn some_function() -> Result { + let data = fetch_data()?; + process(data)?; + Ok(output) +} +``` + +### Match with Options +```rust +match some_option { + None => {} + Some(value) => { /* process */ } +} +``` diff --git a/CHANGELOG.md b/CHANGELOG.md index 45c1271..c2bb94f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,25 @@ All notable changes to this project will be documented in this file. +## Unreleased + +### Code Refactoring + +* Refactored lib.rs into dedicated modules (as2rel, peer_stats, pfx2as) with processor pattern +* Moved types and constants into their corresponding processor modules +* Removed unnecessary internal function exports from public API + +### Algorithm Changes + +* Added two-tier transit detection: `TRUE_TIER1` (14 ASes always valid) vs candidate tier-1 ASes (Zayo, Hurricane Electric) that are only valid transit providers when their next hop is also a tier-1 +* Hurricane Electric (AS 6939) is treated as a candidate tier-1 for IPv6 only, reducing its downstream count by ~70% at tested collectors +* Zayo (AS 6461) is treated as a candidate tier-1 for both IPv4 and IPv6, reducing its downstream count by 14-18% + +### Bug Fixes + +* Removed AS 1239 (Sprint) from tier-1 ASN list to match bgp.tools definition +* Removed unnecessary ASN 0 placeholder from TIER1_V4 array + ## v0.2.1 - 2025-04-09 ### Highlights diff --git a/src/as2rel.rs b/src/as2rel.rs new file mode 100644 index 0000000..8f29758 --- /dev/null +++ b/src/as2rel.rs @@ -0,0 +1,359 @@ +use ipnet::IpNet; +use itertools::Itertools; +use serde::{Deserialize, Serialize}; +use std::collections::{HashMap, HashSet}; +use std::net::IpAddr; + +/// True Tier-1 ASes that always provide transit service. +/// These are the major transit providers that definitively sell upstream connectivity. +pub const TRUE_TIER1: [u32; 14] = [ + 6762, // Sparkle + 12956, // Telefonica + 2914, // NTT + 3356, // Lumen + 6453, // TATA + 701, // Verizon + 3257, // GTT + 1299, // Telia + 3491, // PCCW + 7018, // AT&T + 3320, // DTAG + 5511, // Orange + 6830, // Liberty Global + 174, // Cogent +]; + +/// Candidate Tier-1 ASes for IPv4 that may provide transit. +/// These are only considered transit providers if their next hop is another tier-1 AS. +/// This prevents over-counting downstream ASes for networks that don't actually sell transit. +pub const CANDIDATE_TIER1_V4: [u32; 1] = [ + 6461, // Zayo - only provides transit if connecting to another tier-1 +]; + +/// Candidate Tier-1 ASes for IPv6 that may provide transit. +/// Same logic as CANDIDATE_TIER1_V4 but for IPv6 paths. +pub const CANDIDATE_TIER1_V6: [u32; 2] = [ + 6461, // Zayo - IPv6 + 6939, // Hurricane Electric (IPv6 only) - only provides transit if connecting to another tier-1 +]; + +/// Combined Tier-1 lists for backward compatibility and global processing. +/// These include both true tier-1s and candidates. +pub const TIER1: [u32; 16] = [ + 6762, 12956, 2914, 3356, 6453, 701, 6461, 3257, 1299, 3491, 7018, 3320, 5511, 6830, 174, 6939, +]; + +pub const TIER1_V4: [u32; 15] = [ + 6762, 12956, 2914, 3356, 6453, 701, 6461, 3257, 1299, 3491, 7018, 3320, 5511, 6830, 174, +]; + +pub const TIER1_V6: [u32; 16] = [ + 6762, 12956, 2914, 3356, 6453, 701, 6461, 3257, 1299, 3491, 7018, 3320, 5511, 6830, 174, 6939, +]; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct As2Rel { + pub project: String, + pub collector: String, + pub rib_dump_url: String, + /// AS relationship mapping: Vec + pub as2rel: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct As2RelCount { + pub asn1: u32, + pub asn2: u32, + /// 0 - adjacency (undirected), 1 - asn1 is upstream of asn2 + pub rel: u8, + /// number of paths having this relationship + pub paths_count: usize, + /// number of peers seeing this relationship + pub peers_count: usize, +} + +/// Remove consecutive duplicate ASNs from an AS path. +/// This handles path prepending where the same AS appears multiple times in a row. +pub fn dedup_path(path: Vec) -> Vec { + if path.len() <= 1 { + return path; + } + + let mut new_path = vec![path[0]]; + + for (asn1, asn2) in path.into_iter().tuple_windows::<(u32, u32)>() { + if asn1 != asn2 { + new_path.push(asn2) + } + } + new_path +} + +/// Count peer-to-peer relationships (adjacent ASes in path). +/// This records all direct connections between ASes with rel=0 (unknown). +fn count_peer_relationships( + peer_ip: IpAddr, + as_path: &[u32], + data_map: &mut HashMap<(u32, u32, u8), (usize, HashSet)>, +) { + for (asn1, asn2) in as_path.iter().tuple_windows::<(&u32, &u32)>() { + let (msg_count, peers) = data_map + .entry((*asn1, *asn2, 0)) + .or_insert((0, HashSet::new())); + *msg_count += 1; + peers.insert(peer_ip); + } +} + +/// Determine the transit point in an AS path using the tier-1 algorithm. +/// +/// Algorithm: +/// 1. Look for the first TRUE_TIER1 AS - this is always a valid transit provider +/// 2. If we encounter a CANDIDATE_TIER1 AS first, check if its next hop is another tier-1 AS +/// - If yes: this candidate is a valid transit provider +/// - If no: continue looking for the next tier-1 +/// 3. Return the index of the first valid transit point, or None if none found +/// +/// This prevents over-counting downstream ASes for networks like Zayo (6461) and +/// Hurricane Electric (6939) that don't actually sell transit service unless they're +/// connecting to another tier-1. +pub fn find_transit_point( + as_path: &[u32], + true_tier1: &[u32], + all_tier1_set: &HashSet, +) -> Option { + for (i, asn) in as_path.iter().enumerate() { + // True tier-1: always valid transit + if true_tier1.contains(asn) { + return Some(i); + } + + // Candidate tier-1: only valid if next hop is also tier-1 + if all_tier1_set.contains(asn) + && !true_tier1.contains(asn) + && i + 1 < as_path.len() + && all_tier1_set.contains(&as_path[i + 1]) + { + return Some(i); + } + } + + None +} + +/// Update AS relationship map with provider-customer relationships. +/// +/// Uses the tier-1 transit algorithm to determine which ASes are upstream providers. +/// Only ASes between the origin and the first valid transit point are marked as +/// customer->provider relationships (rel=1). +pub fn update_as2rel_map( + peer_ip: IpAddr, + true_tier1: &[u32], + all_tier1: &[u32], + data_map: &mut HashMap<(u32, u32, u8), (usize, HashSet)>, + // input AS path must be from collector ([0]) to origin ([last]) + original_as_path: &[u32], +) { + let mut as_path = original_as_path.to_vec(); + + // Count peer relationships first + count_peer_relationships(peer_ip, &as_path, data_map); + + // Reverse to process from origin towards collector + as_path.reverse(); + + // Build tier-1 lookup set once + let all_tier1_set: HashSet = all_tier1.iter().copied().collect(); + + // Find the transit point using the tier-1 algorithm + if let Some(transit_idx) = find_transit_point(&as_path, true_tier1, &all_tier1_set) { + // Mark all ASes from origin up to (but not including) the transit point + // as customer -> provider relationships + if transit_idx < as_path.len() - 1 { + for i in 0..transit_idx { + let customer = as_path[i]; + let provider = as_path[i + 1]; + let (msg_count, peers) = data_map + .entry((provider, customer, 1)) + .or_insert((0, HashSet::new())); + *msg_count += 1; + peers.insert(peer_ip); + } + } + } +} + +/// Compile raw relationship data into As2RelCount structs. +pub fn compile_as2rel_count( + data_map: &HashMap<(u32, u32, u8), (usize, HashSet)>, +) -> Vec { + data_map + .iter() + .map(|((asn1, asn2, rel), (msg_count, peers))| As2RelCount { + asn1: *asn1, + asn2: *asn2, + rel: *rel, + paths_count: *msg_count, + peers_count: peers.len(), + }) + .collect() +} + +/// Processor for collecting AS relationship data from BGP RIB dumps. +/// +/// Uses the tier-1 transit algorithm to distinguish between: +/// - True tier-1 ASes that always provide transit +/// - Candidate tier-1 ASes (Zayo, Hurricane Electric) that only provide transit +/// when connecting to another tier-1 +/// +/// This prevents over-counting downstream ASes for networks that don't actually +/// sell transit service in the traditional sense. +pub struct As2RelProcessor { + as2rel_map: HashMap<(u32, u32, u8), (usize, HashSet)>, + as2rel_v4_map: HashMap<(u32, u32, u8), (usize, HashSet)>, + as2rel_v6_map: HashMap<(u32, u32, u8), (usize, HashSet)>, +} + +impl As2RelProcessor { + pub fn new() -> Self { + Self { + as2rel_map: HashMap::new(), + as2rel_v4_map: HashMap::new(), + as2rel_v6_map: HashMap::new(), + } + } + + /// Process a single AS path and update relationship statistics. + /// + /// For IPv4: Uses TRUE_TIER1 and CANDIDATE_TIER1_V4 + /// For IPv6: Uses TRUE_TIER1 and CANDIDATE_TIER1_V6 + /// Global: Uses combined TIER1 lists + pub fn process_path(&mut self, peer_ip: IpAddr, prefix_type: IpNet, as_path: &[u32]) { + // Global as2rel (all paths) - uses old algorithm for compatibility + // Uses combined TIER1 list without candidate validation + update_as2rel_map(peer_ip, &TIER1, &TIER1, &mut self.as2rel_map, as_path); + + // v4/v6 specific as2rel - uses new tier-1 transit algorithm + match prefix_type { + IpNet::V4(_) => { + update_as2rel_map( + peer_ip, + &TRUE_TIER1, + &TIER1_V4, + &mut self.as2rel_v4_map, + as_path, + ); + } + IpNet::V6(_) => { + update_as2rel_map( + peer_ip, + &TRUE_TIER1, + &TIER1_V6, + &mut self.as2rel_v6_map, + as_path, + ); + } + } + } + + /// Convert collected data into As2Rel structs. + pub fn into_as2rel_triple( + self, + project: &str, + collector: &str, + rib_dump_url: &str, + ) -> (As2Rel, As2Rel, As2Rel) { + let as2rel_global = compile_as2rel_count(&self.as2rel_map); + let as2rel_v4 = compile_as2rel_count(&self.as2rel_v4_map); + let as2rel_v6 = compile_as2rel_count(&self.as2rel_v6_map); + + ( + As2Rel { + project: project.to_string(), + collector: collector.to_string(), + rib_dump_url: rib_dump_url.to_string(), + as2rel: as2rel_global, + }, + As2Rel { + project: project.to_string(), + collector: collector.to_string(), + rib_dump_url: rib_dump_url.to_string(), + as2rel: as2rel_v4, + }, + As2Rel { + project: project.to_string(), + collector: collector.to_string(), + rib_dump_url: rib_dump_url.to_string(), + as2rel: as2rel_v6, + }, + ) + } +} + +impl Default for As2RelProcessor { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashSet; + + #[test] + fn test_find_transit_point_true_tier1() { + // True tier-1 should always be transit point + let path = vec![100, 200, 174, 300]; // 174 (Cogent) is true tier-1 + let all_tier1_set: HashSet = TIER1.iter().copied().collect(); + let result = find_transit_point(&path, &TRUE_TIER1, &all_tier1_set); + assert_eq!(result, Some(2)); // Index of 174 + } + + #[test] + fn test_find_transit_point_candidate_with_tier1_next() { + // 6461 (Zayo) with tier-1 next hop should be transit + let path = vec![100, 200, 6461, 174, 300]; // 6461 -> 174 (tier-1) + let all_tier1_set: HashSet = TIER1_V4.iter().copied().collect(); + let result = find_transit_point(&path, &TRUE_TIER1, &all_tier1_set); + assert_eq!(result, Some(2)); // Index of 6461 + } + + #[test] + fn test_find_transit_point_candidate_without_tier1_next() { + // 6461 without tier-1 next hop should NOT be transit + let path = vec![100, 200, 6461, 300, 400]; // 6461 -> 300 (not tier-1) + let all_tier1_set: HashSet = TIER1_V4.iter().copied().collect(); + let result = find_transit_point(&path, &TRUE_TIER1, &all_tier1_set); + assert_eq!(result, None); // No valid transit point + } + + #[test] + fn test_find_transit_point_true_tier1_after_candidate() { + // True tier-1 after candidate that doesn't transit + let path = vec![100, 200, 6461, 300, 174, 400]; + // 6461 at index 2 doesn't transit (300 not tier-1) + // 174 at index 4 is true tier-1 + let all_tier1_set: HashSet = TIER1_V4.iter().copied().collect(); + let result = find_transit_point(&path, &TRUE_TIER1, &all_tier1_set); + assert_eq!(result, Some(4)); // Index of 174 + } + + #[test] + fn test_find_transit_point_he_with_tier1_next_v6() { + // HE (6939) with tier-1 next hop should be valid transit (IPv6) + let path = vec![100, 200, 6939, 174, 300]; // 6939 -> 174 (tier-1) + let all_tier1_set: HashSet = TIER1_V6.iter().copied().collect(); + let result = find_transit_point(&path, &TRUE_TIER1, &all_tier1_set); + assert_eq!(result, Some(2)); // Index of 6939 + } + + #[test] + fn test_find_transit_point_he_without_tier1_next_v6() { + // HE (6939) without tier-1 next hop should NOT be transit (IPv6) + let path = vec![100, 200, 6939, 300, 400]; // 6939 -> 300 (not tier-1) + let all_tier1_set: HashSet = TIER1_V6.iter().copied().collect(); + let result = find_transit_point(&path, &TRUE_TIER1, &all_tier1_set); + assert_eq!(result, None); // No valid transit point + } +} diff --git a/src/lib.rs b/src/lib.rs index 36d0690..6abbe73 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,154 +1,23 @@ -#![allow(dead_code)] -use anyhow::Result; -use bgpkit_parser::BgpkitParser; -use ipnet::{IpNet, Ipv4Net, Ipv6Net}; -use itertools::Itertools; -use serde::{Deserialize, Serialize}; -use std::collections::{HashMap, HashSet}; -use std::net::IpAddr; - -#[derive(Debug, Clone, Serialize)] -pub struct RibPeerInfo { - pub project: String, - pub collector: String, - pub rib_dump_url: String, - pub peers: HashMap, -} - -#[derive(Debug, Clone, Serialize)] -pub struct PeerInfo { - pub ip: IpAddr, - pub asn: u32, - pub num_v4_pfxs: usize, - pub num_v6_pfxs: usize, - pub num_connected_asns: usize, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Prefix2As { - pub project: String, - pub collector: String, - pub rib_dump_url: String, - /// prefix to as mapping: > - pub pfx2as: Vec, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Prefix2AsCount { - pub prefix: String, - pub asn: u32, - pub count: usize, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct As2Rel { - pub project: String, - pub collector: String, - pub rib_dump_url: String, - /// prefix to as mapping: > - pub as2rel: Vec, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct As2RelCount { - pub asn1: u32, - pub asn2: u32, - /// 1 - asn1 is upstream of asn2, 2 - peer, 0 - unknown - pub rel: u8, - /// number of paths having this relationship - pub paths_count: usize, - /// number of peers seeing this relationship - pub peers_count: usize, -} - -const TIER1: [u32; 17] = [ - 6762, 12956, 2914, 3356, 6453, 1239, 701, 6461, 3257, 1299, 3491, 7018, 3320, 5511, 6830, 174, - 6939, -]; - -const TIER1_V4: [u32; 17] = [ - 6762, 12956, 2914, 3356, 6453, 1239, 701, 6461, 3257, 1299, 3491, 7018, 3320, 5511, 6830, 174, - 0, -]; - -const TIER1_V6: [u32; 17] = [ - 6762, 12956, 2914, 3356, 6453, 1239, 701, 6461, 3257, 1299, 3491, 7018, 3320, 5511, 6830, 174, - 6939, -]; - -fn dedup_path(path: Vec) -> Vec { - if path.len() <= 1 { - return path; - } - - let mut new_path = vec![path[0]]; - - for (asn1, asn2) in path.into_iter().tuple_windows::<(u32, u32)>() { - if asn1 != asn2 { - new_path.push(asn2) - } - } - new_path -} +pub mod as2rel; +pub mod peer_stats; +pub mod pfx2as; -fn update_as2rel_map( - peer_ip: IpAddr, - tier1: &[u32], - data_map: &mut HashMap<(u32, u32, u8), (usize, HashSet)>, - // input AS path must be from collector ([0]) to origin ([last]) - original_as_path: &[u32], -) { - let mut as_path = original_as_path.to_vec(); +// Re-export tier-1 constants from as2rel +pub use as2rel::{CANDIDATE_TIER1_V4, CANDIDATE_TIER1_V6, TIER1, TIER1_V4, TIER1_V6, TRUE_TIER1}; - // counting peer relationships - for (asn1, asn2) in as_path.iter().tuple_windows::<(&u32, &u32)>() { - let (msg_count, peers) = data_map - .entry((*asn1, *asn2, 0)) - .or_insert((0, HashSet::new())); - *msg_count += 1; - peers.insert(peer_ip); - } +// Re-export types from their respective modules +pub use as2rel::{As2Rel, As2RelCount}; +pub use peer_stats::{PeerInfo, RibPeerInfo}; +pub use pfx2as::{Prefix2As, Prefix2AsCount}; - // counting provider-customer relationships - as_path.reverse(); - let contains_tier1 = as_path.iter().any(|x| tier1.contains(x)); - if contains_tier1 { - let mut first_tier1: usize = usize::MAX; - for (i, asn) in as_path.iter().enumerate() { - if tier1.contains(asn) && first_tier1 == usize::MAX { - first_tier1 = i; - break; - } - } +// Re-export processors +pub use as2rel::{dedup_path, As2RelProcessor}; +pub use peer_stats::PeerStatsProcessor; +pub use pfx2as::Pfx2AsProcessor; - // origin to first tier 1 - if first_tier1 < as_path.len() - 1 { - for i in 0..first_tier1 { - let (asn1, asn2) = (as_path.get(i).unwrap(), as_path.get(i + 1).unwrap()); - let (msg_count, peers) = data_map - .entry((*asn2, *asn1, 1)) - .or_insert((0, HashSet::new())); - *msg_count += 1; - peers.insert(peer_ip); - } - } - } -} - -fn compile_as2rel_count( - data_map: &HashMap<(u32, u32, u8), (usize, HashSet)>, -) -> Vec { - data_map - .iter() - .map(|((asn1, asn2, rel), (msg_count, peers))| As2RelCount { - asn1: *asn1, - asn2: *asn2, - rel: *rel, - paths_count: *msg_count, - peers_count: peers.len(), - }) - .collect() -} +use anyhow::Result; +use bgpkit_parser::BgpkitParser; +use ipnet::IpNet; /// collect information from a provided RIB file /// @@ -165,147 +34,59 @@ pub fn parse_rib_file( project: &str, collector: &str, ) -> Result<(RibPeerInfo, Prefix2As, (As2Rel, As2Rel, As2Rel))> { - // peer-stats - let mut peer_asn_map: HashMap = HashMap::new(); - let mut peer_connection: HashMap> = HashMap::new(); - let mut peer_v4_pfxs_map: HashMap> = HashMap::new(); - let mut peer_v6_pfxs_map: HashMap> = HashMap::new(); - - // pfx2as - let mut pfx2as_map: HashMap<(String, u32), usize> = HashMap::new(); - - // as2rel - let mut as2rel_map: HashMap<(u32, u32, u8), (usize, HashSet)> = HashMap::new(); - let mut as2rel_v4_map: HashMap<(u32, u32, u8), (usize, HashSet)> = HashMap::new(); - let mut as2rel_v6_map: HashMap<(u32, u32, u8), (usize, HashSet)> = HashMap::new(); + let mut peer_stats_collector = PeerStatsProcessor::new(); + let mut pfx2as_collector = Pfx2AsProcessor::new(); + let mut as2rel_collector = As2RelProcessor::new(); for elem in BgpkitParser::new(file_url)? { - peer_asn_map - .entry(elem.peer_ip) - .or_insert(elem.peer_asn.to_u32()); + let peer_ip = elem.peer_ip; + let peer_asn = elem.peer_asn.to_u32(); + let prefix = elem.prefix.prefix; + + // Extract prefix info + let (prefix_v4, prefix_v6) = match prefix { + IpNet::V4(net) => (Some(net), None), + IpNet::V6(net) => (None, Some(net)), + }; + + // Process AS path data + let mut connected_asn = None; if let Some(as_path) = elem.as_path.clone() { if let Some(u32_path) = as_path.to_u32_vec_opt(true) { - // peer-stats - match u32_path.get(1) { - None => {} - Some(asn) => { - peer_connection - .entry(elem.peer_ip) - .or_default() - .insert(*asn); - } - }; + // Get connected ASN (second hop in path) + connected_asn = u32_path.get(1).copied(); - // pfx2as - match u32_path.last() { - None => {} - Some(asn) => { - let prefix = elem.prefix.to_string(); - let count = pfx2as_map.entry((prefix, *asn)).or_insert(0); - *count += 1; - } + // Get origin ASN for pfx2as + if let Some(origin_asn) = u32_path.last().copied() { + pfx2as_collector.record(prefix.to_string(), origin_asn); } - // do a global and a v4/v6 specific as2rel - for is_global in [true, false] { - // get tier-1 ASes list and the corresponding as2rel_map - let (tier1, data_map) = match is_global { - true => (TIER1.to_vec(), &mut as2rel_map), - false => match elem.prefix.prefix { - IpNet::V4(_) => (TIER1_V4.to_vec(), &mut as2rel_v4_map), - IpNet::V6(_) => (TIER1_V6.to_vec(), &mut as2rel_v6_map), - }, - }; - // update as2rel_map - update_as2rel_map(elem.peer_ip, &tier1, data_map, &u32_path); - } + // Process AS relationships + as2rel_collector.process_path(peer_ip, prefix, &u32_path); } } - match elem.prefix.prefix { - IpNet::V4(net) => { - peer_v4_pfxs_map - .entry(elem.peer_ip) - .or_default() - .insert(net); - } - IpNet::V6(net) => { - peer_v6_pfxs_map - .entry(elem.peer_ip) - .or_default() - .insert(net); - } - } - drop(elem); - } - - let mut peer_info_map: HashMap = HashMap::new(); - for (ip, asn) in peer_asn_map { - let num_v4_pfxs = peer_v4_pfxs_map.entry(ip).or_default().len(); - let num_v6_pfxs = peer_v6_pfxs_map.entry(ip).or_default().len(); - let num_connected_asn = peer_connection.entry(ip).or_default().len(); - let ip_clone = ip; - let asn_clone = asn; - peer_info_map.insert( - ip_clone, - PeerInfo { - ip: ip_clone, - asn: asn_clone, - num_v4_pfxs, - num_v6_pfxs, - num_connected_asns: num_connected_asn, - }, + // Update peer stats + peer_stats_collector.process_element( + peer_ip, + peer_asn, + prefix_v4, + prefix_v6, + connected_asn, ); } - let pfx2as = pfx2as_map - .into_iter() - .map(|((prefix, asn), count)| Prefix2AsCount { prefix, asn, count }) - .collect(); - - let as2rel_global = compile_as2rel_count(&as2rel_map); - let as2rel_v4 = compile_as2rel_count(&as2rel_v4_map); - let as2rel_v6 = compile_as2rel_count(&as2rel_v6_map); + let peer_info = peer_stats_collector.into_peer_info(project, collector, file_url); + let pfx2as = pfx2as_collector.into_prefix2as(project, collector, file_url); + let as2rel_triple = as2rel_collector.into_as2rel_triple(project, collector, file_url); - Ok(( - RibPeerInfo { - project: project.to_string(), - collector: collector.to_string(), - rib_dump_url: file_url.to_string(), - peers: peer_info_map, - }, - Prefix2As { - project: project.to_string(), - collector: collector.to_string(), - rib_dump_url: file_url.to_string(), - pfx2as, - }, - ( - As2Rel { - project: project.to_string(), - collector: collector.to_string(), - rib_dump_url: file_url.to_string(), - as2rel: as2rel_global, - }, - As2Rel { - project: project.to_string(), - collector: collector.to_string(), - rib_dump_url: file_url.to_string(), - as2rel: as2rel_v4, - }, - As2Rel { - project: project.to_string(), - collector: collector.to_string(), - rib_dump_url: file_url.to_string(), - as2rel: as2rel_v6, - }, - ), - )) + Ok((peer_info, pfx2as, as2rel_triple)) } #[cfg(test)] mod tests { - use super::*; + use crate::as2rel::dedup_path; + use crate::parse_rib_file; use serde_json::json; use std::fs::File; use tracing::{info, Level}; diff --git a/src/peer_stats.rs b/src/peer_stats.rs new file mode 100644 index 0000000..9e13565 --- /dev/null +++ b/src/peer_stats.rs @@ -0,0 +1,102 @@ +use ipnet::{Ipv4Net, Ipv6Net}; +use serde::Serialize; +use std::collections::{HashMap, HashSet}; +use std::net::IpAddr; + +#[derive(Debug, Clone, Serialize)] +pub struct RibPeerInfo { + pub project: String, + pub collector: String, + pub rib_dump_url: String, + pub peers: HashMap, +} + +#[derive(Debug, Clone, Serialize)] +pub struct PeerInfo { + pub ip: IpAddr, + pub asn: u32, + pub num_v4_pfxs: usize, + pub num_v6_pfxs: usize, + pub num_connected_asns: usize, +} + +pub struct PeerStatsProcessor { + peer_asn_map: HashMap, + peer_connection: HashMap>, + peer_v4_pfxs_map: HashMap>, + peer_v6_pfxs_map: HashMap>, +} + +impl PeerStatsProcessor { + pub fn new() -> Self { + Self { + peer_asn_map: HashMap::new(), + peer_connection: HashMap::new(), + peer_v4_pfxs_map: HashMap::new(), + peer_v6_pfxs_map: HashMap::new(), + } + } + + pub fn process_element( + &mut self, + peer_ip: IpAddr, + peer_asn: u32, + prefix_v4: Option, + prefix_v6: Option, + connected_asn: Option, + ) { + self.peer_asn_map.entry(peer_ip).or_insert(peer_asn); + + if let Some(asn) = connected_asn { + self.peer_connection.entry(peer_ip).or_default().insert(asn); + } + + if let Some(net) = prefix_v4 { + self.peer_v4_pfxs_map + .entry(peer_ip) + .or_default() + .insert(net); + } + + if let Some(net) = prefix_v6 { + self.peer_v6_pfxs_map + .entry(peer_ip) + .or_default() + .insert(net); + } + } + + pub fn into_peer_info(self, project: &str, collector: &str, rib_dump_url: &str) -> RibPeerInfo { + let mut peer_info_map: HashMap = HashMap::new(); + + for (ip, asn) in self.peer_asn_map { + let num_v4_pfxs = self.peer_v4_pfxs_map.get(&ip).map_or(0, |s| s.len()); + let num_v6_pfxs = self.peer_v6_pfxs_map.get(&ip).map_or(0, |s| s.len()); + let num_connected_asns = self.peer_connection.get(&ip).map_or(0, |s| s.len()); + + peer_info_map.insert( + ip, + PeerInfo { + ip, + asn, + num_v4_pfxs, + num_v6_pfxs, + num_connected_asns, + }, + ); + } + + RibPeerInfo { + project: project.to_string(), + collector: collector.to_string(), + rib_dump_url: rib_dump_url.to_string(), + peers: peer_info_map, + } + } +} + +impl Default for PeerStatsProcessor { + fn default() -> Self { + Self::new() + } +} diff --git a/src/pfx2as.rs b/src/pfx2as.rs new file mode 100644 index 0000000..47c5e5d --- /dev/null +++ b/src/pfx2as.rs @@ -0,0 +1,56 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Prefix2As { + pub project: String, + pub collector: String, + pub rib_dump_url: String, + /// prefix to as mapping: > + pub pfx2as: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Prefix2AsCount { + pub prefix: String, + pub asn: u32, + pub count: usize, +} + +pub struct Pfx2AsProcessor { + pfx2as_map: HashMap<(String, u32), usize>, +} + +impl Pfx2AsProcessor { + pub fn new() -> Self { + Self { + pfx2as_map: HashMap::new(), + } + } + + pub fn record(&mut self, prefix: String, asn: u32) { + let count = self.pfx2as_map.entry((prefix, asn)).or_insert(0); + *count += 1; + } + + pub fn into_prefix2as(self, project: &str, collector: &str, rib_dump_url: &str) -> Prefix2As { + let pfx2as = self + .pfx2as_map + .into_iter() + .map(|((prefix, asn), count)| Prefix2AsCount { prefix, asn, count }) + .collect(); + + Prefix2As { + project: project.to_string(), + collector: collector.to_string(), + rib_dump_url: rib_dump_url.to_string(), + pfx2as, + } + } +} + +impl Default for Pfx2AsProcessor { + fn default() -> Self { + Self::new() + } +}