From 10470199ab5847f252c51331da5b9df4d22e8ae1 Mon Sep 17 00:00:00 2001 From: Mingwei Zhang Date: Sat, 21 Mar 2026 09:49:40 -0700 Subject: [PATCH 1/2] feat: add RPKISPOOL data source with CCR format parsing Add support for loading historical RPKI data from RPKISPOOL archives (draft-snijders-rpkispool-format). Parses CCR files (draft-ietf-sidrops-rpki-ccr) from the rpkispool .tar.zst archive, which contains all validated ROA and ASPA payloads for one vantage point snapshot. - Add `bcder` and `zstd` dependencies under the `rpki` feature flag - Implement CCR DER parser using `bcder` for VRPs and VAPs extraction - Add `RpkiSpoolsCollector` enum with three mirrors (sobornost.net, attn.jp, kerfuffle.net) - Add `RpkiSpools` variant to `HistoricalRpkiSource` - Expose public `parse_ccr()` and `parse_rpkispools_archive()` functions - Add example demonstrating RPKISPOOL lookup for 1.1.1.0/24 --- CHANGELOG.md | 14 + Cargo.toml | 8 +- README.md | 11 +- examples/rpkispools.rs | 46 +++ src/lib.rs | 12 + src/rpki/mod.rs | 11 +- src/rpki/rpkispools.rs | 628 +++++++++++++++++++++++++++++++++++++++++ 7 files changed, 726 insertions(+), 4 deletions(-) create mode 100644 examples/rpkispools.rs create mode 100644 src/rpki/rpkispools.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 1d7f9c3..13483a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,20 @@ All notable changes to this project will be documented in this file. ## Unreleased +### Feature flags + +* Added `bcder` and `zstd` as optional dependencies under the `rpki` feature flag + +### New features + +* Added RPKISPOOL data source support ([draft-snijders-rpkispool-format](https://datatracker.ietf.org/doc/draft-snijders-rpkispool-format/)) + - Parses CCR ([draft-ietf-sidrops-rpki-ccr](https://datatracker.ietf.org/doc/draft-ietf-sidrops-rpki-ccr/)) files from RPKISPOOL `.tar.zst` archives + - Uses `bcder` for DER parsing instead of processing ~942K individual `.roa` files + - Added `RpkiSpoolsCollector` enum with three mirrors: `SobornostNet`, `AttnJp`, `KerfuffleNet` + - Public API: `parse_ccr()`, `parse_rpkispools_archive()`, `RpkiTrie::from_rpkispools()` + - Integrated into `BgpkitCommons::load_rpki_historical()`, `load_rpki_from_files()`, `list_rpki_files()` + - Added `rpkispools` example + ### Bug fixes * Fixed typo in `RpkiViewsCollector` enum variant: `SoborostNet` renamed to `SobornostNet` diff --git a/Cargo.toml b/Cargo.toml index c5475cf..d8c116d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,8 @@ regex = { version = "1", optional = true } serde_json = { version = "1", optional = true } tracing = { version = "0.1", optional = true } tar = { version = "0.4", optional = true } +zstd = { version = "0.13", optional = true } +bcder = { version = "0.7", optional = true } [dev-dependencies] tracing-subscriber = "0.3" @@ -42,7 +44,7 @@ as2rel = ["oneio", "serde_json", "tracing"] bogons = ["oneio", "ipnet", "regex", "chrono"] countries = ["oneio"] mrt_collectors = ["oneio", "chrono"] -rpki = ["oneio", "ipnet", "ipnet-trie", "chrono", "tracing", "tar", "serde_json"] +rpki = ["oneio", "ipnet", "ipnet-trie", "chrono", "tracing", "tar", "serde_json", "zstd", "bcder"] # Convenience feature to enable all modules all = ["asinfo", "as2rel", "bogons", "countries", "mrt_collectors", "rpki"] @@ -64,6 +66,10 @@ required-features = ["rpki"] name = "rpki_historical" required-features = ["rpki"] +[[example]] +name = "rpkispools" +required-features = ["rpki"] + [lints.clippy] uninlined_format_args = "allow" collapsible_if = "allow" diff --git a/README.md b/README.md index d61084c..ee37b52 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ graph TD M3 -->|bogons_match| A3[IANA registries] M4 -->|country_by_code| A4[GeoNames] M5 -->|mrt_collectors_all| A5[RouteViews / RIPE RIS] - M6 -->|rpki_validate| A6[Cloudflare / RIPE NCC / RPKIviews] + M6 -->|rpki_validate| A6[Cloudflare / RIPE NCC / RPKIviews / RPKISPOOL] ``` Each module is gated by a feature flag. The `all` feature (default) enables everything. @@ -42,7 +42,7 @@ Data is fetched on the first `load_xxx()` call and kept in memory until `reload( | [`bogons`] | `bogons` | IANA special registries | `bogons_match`, `bogons_match_prefix`, `bogons_match_asn` | | [`countries`] | `countries` | GeoNames | `country_by_code`, `country_by_code3`, `country_by_name` | | [`mrt_collectors`] | `mrt_collectors` | RouteViews, RIPE RIS | `mrt_collectors_all`, `mrt_collector_peers_all` | -| [`rpki`] | `rpki` | Cloudflare, RIPE NCC, RPKIviews | `rpki_validate`, `rpki_validate_check_expiry`, `rpki_lookup_by_prefix` | +| [`rpki`] | `rpki` | Cloudflare, RIPE NCC, RPKIviews, RPKISPOOL | `rpki_validate`, `rpki_validate_check_expiry`, `rpki_lookup_by_prefix` | ## Quick Start @@ -99,10 +99,17 @@ commons.load_rpki_historical(date, HistoricalRpkiSource::Ripe).unwrap(); // Or from an RPKIviews collector let source = HistoricalRpkiSource::RpkiViews(RpkiViewsCollector::SobornostNet); commons.load_rpki_historical(date, source).unwrap(); + +// Or from RPKISPOOL (CCR format, parses faster) +use bgpkit_commons::rpki::RpkiSpoolsCollector; +let source = HistoricalRpkiSource::RpkiSpools(RpkiSpoolsCollector::default()); +commons.load_rpki_historical(date, source).unwrap(); ``` Available RPKIviews collectors: `SobornostNet` (default), `MassarsNet`, `AttnJp`, `KerfuffleNet`. +Available RPKISPOOL collectors: `SobornostNet` (default), `AttnJp`, `KerfuffleNet`. + ### AS Information with Builder ```rust diff --git a/examples/rpkispools.rs b/examples/rpkispools.rs new file mode 100644 index 0000000..6759765 --- /dev/null +++ b/examples/rpkispools.rs @@ -0,0 +1,46 @@ +//! Example demonstrating RPKI data loading from RPKISPOOL archives (CCR format) +//! +//! Run with: cargo run --example rpkispools --features rpki + +use bgpkit_commons::BgpkitCommons; +use bgpkit_commons::rpki::{HistoricalRpkiSource, RpkiSpoolsCollector}; +use chrono::NaiveDate; + +fn main() { + tracing_subscriber::fmt::init(); + + let date = NaiveDate::from_ymd_opt(2026, 1, 1).unwrap(); + let mut commons = BgpkitCommons::new(); + + println!("Loading RPKISPOOL data for {} ...", date); + let source = HistoricalRpkiSource::RpkiSpools(RpkiSpoolsCollector::default()); + commons + .load_rpki_historical(date, source) + .expect("failed to load RPKISPOOL data"); + + let prefix = "1.1.1.0/24"; + println!("\nROAs covering {}:", prefix); + match commons.rpki_lookup_by_prefix(prefix) { + Ok(roas) => { + for roa in &roas { + println!( + " prefix={} AS{} max_length={}", + roa.prefix, roa.asn, roa.max_length, + ); + } + if roas.is_empty() { + println!(" (none)"); + } + } + Err(e) => println!(" Error: {}", e), + } + + // Validate Cloudflare's origin for 1.1.1.0/24 + let asn = 13335; + println!( + "\nValidation for {} origin AS{}: {:?}", + prefix, + asn, + commons.rpki_validate(asn, prefix).unwrap() + ); +} diff --git a/src/lib.rs b/src/lib.rs index 7452a38..6a674c9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -389,6 +389,9 @@ impl BgpkitCommons { rpki::HistoricalRpkiSource::RpkiViews(collector) => { self.rpki_trie = Some(rpki::RpkiTrie::from_rpkiviews(collector, date)?); } + rpki::HistoricalRpkiSource::RpkiSpools(collector) => { + self.rpki_trie = Some(rpki::RpkiTrie::from_rpkispools(collector, date)?); + } } Ok(()) } @@ -432,6 +435,12 @@ impl BgpkitCommons { rpki::HistoricalRpkiSource::RpkiViews(_) => { self.rpki_trie = Some(rpki::RpkiTrie::from_rpkiviews_files(urls, date)?); } + rpki::HistoricalRpkiSource::RpkiSpools(_) => { + // For RPKISPOOL, each URL is a tar.zst archive; load the first one + if let Some(url) = urls.first() { + self.rpki_trie = Some(rpki::RpkiTrie::from_rpkispools_url(url, date)?); + } + } } Ok(()) } @@ -466,6 +475,9 @@ impl BgpkitCommons { rpki::HistoricalRpkiSource::RpkiViews(collector) => { rpki::list_rpkiviews_files(collector, date) } + rpki::HistoricalRpkiSource::RpkiSpools(collector) => { + rpki::list_rpkispools_files(collector, date) + } } } diff --git a/src/rpki/mod.rs b/src/rpki/mod.rs index cb9aa21..544e8af 100644 --- a/src/rpki/mod.rs +++ b/src/rpki/mod.rs @@ -136,6 +136,7 @@ mod cloudflare; mod ripe_historical; pub(crate) mod rpki_client; +mod rpkispools; mod rpkiviews; use chrono::{DateTime, NaiveDate, NaiveDateTime, Utc}; @@ -146,6 +147,9 @@ use crate::errors::{load_methods, modules}; use crate::{BgpkitCommons, BgpkitCommonsError, LazyLoadable, Result}; pub use ripe_historical::list_ripe_files; use rpki_client::RpkiClientData; +pub use rpkispools::{ + RpkiSpoolsCollector, RpkiSpoolsData, list_rpkispools_files, parse_ccr, parse_rpkispools_archive, +}; pub use rpkiviews::{RpkiViewsCollector, list_rpkiviews_files}; use serde::{Deserialize, Serialize}; use std::fmt::Display; @@ -210,8 +214,10 @@ pub enum HistoricalRpkiSource { /// RIPE NCC historical archives (data from all 5 RIRs) #[default] Ripe, - /// RPKIviews collector + /// RPKIviews collector (tgz archives with rpki-client JSON) RpkiViews(RpkiViewsCollector), + /// RPKISPOOL collector (tar.zst archives with CCR files) + RpkiSpools(RpkiSpoolsCollector), } impl std::fmt::Display for HistoricalRpkiSource { @@ -219,6 +225,9 @@ impl std::fmt::Display for HistoricalRpkiSource { match self { HistoricalRpkiSource::Ripe => write!(f, "RIPE NCC"), HistoricalRpkiSource::RpkiViews(collector) => write!(f, "RPKIviews ({})", collector), + HistoricalRpkiSource::RpkiSpools(collector) => { + write!(f, "RPKISPOOL ({})", collector) + } } } } diff --git a/src/rpki/rpkispools.rs b/src/rpki/rpkispools.rs new file mode 100644 index 0000000..03415b4 --- /dev/null +++ b/src/rpki/rpkispools.rs @@ -0,0 +1,628 @@ +//! Load RPKI data from RPKISPOOL archives. +//! +//! The RPKISPOOL format ([draft-snijders-rpkispool-format]) provides historical +//! RPKI data in `.tar.zst` archives containing CCR (Canonical Cache Representation) +//! files ([draft-ietf-sidrops-rpki-ccr]) with validated ROA and ASPA payloads. +//! +//! Each RPKISPOOL archive contains CCR files from multiple vantage points, +//! with multiple snapshots per day. CCR files encode VRPs (Validated ROA Payloads) +//! and VAPs (Validated ASPA Payloads) in DER-encoded ASN.1. +//! +//! RPKISPOOL data is available from three mirrors: +//! - (Netherlands) +//! - (Japan) +//! - (United States) +//! +//! [draft-snijders-rpkispool-format]: https://datatracker.ietf.org/doc/draft-snijders-rpkispool-format/ +//! [draft-ietf-sidrops-rpki-ccr]: https://datatracker.ietf.org/doc/draft-ietf-sidrops-rpki-ccr/ + +use crate::Result; +use crate::rpki::{Aspa, Roa, RpkiFile, RpkiTrie}; +use chrono::{DateTime, Datelike, NaiveDate, Utc}; +use ipnet::IpNet; +use serde::{Deserialize, Serialize}; +use std::io::Read; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; +use std::str::FromStr; +use tracing::info; + +/// Available RPKISPOOL mirror collectors. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)] +pub enum RpkiSpoolsCollector { + /// josephine.sobornost.net - A2B Internet (AS51088), Amsterdam, Netherlands + #[default] + SobornostNet, + /// dango.attn.jp - Internet Initiative Japan (AS2497), Tokyo, Japan + AttnJp, + /// rpkiviews.kerfuffle.net - Kerfuffle, LLC (AS35008), Fremont, California, United States + KerfuffleNet, +} + +impl RpkiSpoolsCollector { + /// Get the HTTPS base URL for this collector's RPKISPOOL directory. + pub fn base_url(&self) -> &'static str { + match self { + RpkiSpoolsCollector::SobornostNet => { + "https://josephine.sobornost.net/rpkidata/rpkispools" + } + RpkiSpoolsCollector::AttnJp => "https://dango.attn.jp/rpkidata/rpkispools", + RpkiSpoolsCollector::KerfuffleNet => { + "https://rpkiviews.kerfuffle.net/rpkidata/rpkispools" + } + } + } + + /// Get all available collectors + pub fn all() -> Vec { + vec![ + RpkiSpoolsCollector::SobornostNet, + RpkiSpoolsCollector::AttnJp, + RpkiSpoolsCollector::KerfuffleNet, + ] + } +} + +impl std::fmt::Display for RpkiSpoolsCollector { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RpkiSpoolsCollector::SobornostNet => write!(f, "sobornost.net"), + RpkiSpoolsCollector::AttnJp => write!(f, "attn.jp"), + RpkiSpoolsCollector::KerfuffleNet => write!(f, "kerfuffle.net"), + } + } +} + +impl FromStr for RpkiSpoolsCollector { + type Err = String; + + fn from_str(s: &str) -> std::result::Result { + match s.to_lowercase().as_str() { + "sobornost.net" | "josephine.sobornost.net" => Ok(RpkiSpoolsCollector::SobornostNet), + "attn.jp" | "dango.attn.jp" => Ok(RpkiSpoolsCollector::AttnJp), + "kerfuffle.net" | "rpkiviews.kerfuffle.net" => Ok(RpkiSpoolsCollector::KerfuffleNet), + _ => Err(format!("unknown RPKISPOOL collector: {}", s)), + } + } +} + +/// Parsed data from an RPKISPOOL CCR file. +pub struct RpkiSpoolsData { + /// Validated ROA Payloads + pub roas: Vec, + /// Validated ASPA Payloads + pub aspas: Vec, +} + +/// Build the RPKISPOOL changelog archive URL for a given date. +/// +/// The RPKISPOOL archive contains CCR snapshots throughout the day, +/// which is much more efficient to parse than the initstate archive. +pub fn rpkispool_url(collector: RpkiSpoolsCollector, date: NaiveDate) -> String { + format!( + "{}/{:04}/{:02}/{:02}/{:04}{:02}{:02}-rpkispool.tar.zst", + collector.base_url(), + date.year(), + date.month(), + date.day(), + date.year(), + date.month(), + date.day() + ) +} + +/// Build the initstate archive URL for a given date. +#[allow(dead_code)] +pub fn initstate_url(collector: RpkiSpoolsCollector, date: NaiveDate) -> String { + format!( + "{}/{:04}/{:02}/{:02}/{:04}{:02}{:02}-initstate.tar.zst", + collector.base_url(), + date.year(), + date.month(), + date.day(), + date.year(), + date.month(), + date.day() + ) +} + +/// List available RPKISPOOL files for a given date. +/// +/// Returns the RPKISPOOL archive URL (contains CCR files) for the date. +pub fn list_rpkispools_files( + collector: RpkiSpoolsCollector, + date: NaiveDate, +) -> Result> { + let url = rpkispool_url(collector, date); + let timestamp = date + .and_hms_opt(0, 0, 0) + .and_then(|dt| DateTime::from_naive_utc_and_offset(dt, Utc).into()); + + Ok(vec![RpkiFile { + url, + timestamp: timestamp.unwrap_or_else(|| DateTime::from_timestamp(0, 0).unwrap()), + size: None, + rir: None, + collector: None, + }]) +} + +// ============================================================================ +// CCR DER Parsing +// ============================================================================ + +/// Parse a CCR (Canonical Cache Representation) file and extract ROAs and ASPAs. +/// +/// The CCR format (draft-ietf-sidrops-rpki-ccr) is a DER-encoded ASN.1 structure: +/// ```text +/// SEQUENCE { +/// OID (id-ct-rpkiCanonicalCacheRepresentation), +/// [0] EXPLICIT SEQUENCE { -- the CCR content +/// SEQUENCE { OID } -- hashAlg (SHA-256) +/// GeneralizedTime -- producedAt +/// [1] ManifestState OPTIONAL +/// [2] ROAPayloadState OPTIONAL -- VRPs +/// [3] ASPAPayloadState OPTIONAL -- VAPs +/// [4] TrustAnchorState OPTIONAL +/// [5] RouterKeyState OPTIONAL +/// } +/// } +/// ``` +pub fn parse_ccr(data: &[u8]) -> Result { + use bcder::Mode; + use bcder::decode::SliceSource; + + let source = SliceSource::new(data); + Mode::Der.decode(source, parse_ccr_content).map_err(|e| { + crate::BgpkitCommonsError::data_source_error( + "RPKISPOOL", + format!("Failed to parse CCR: {}", e), + ) + }) +} + +/// Parse the outer ContentInfo-like wrapper and extract VRPs/VAPs. +fn parse_ccr_content( + cons: &mut bcder::decode::Constructed, +) -> std::result::Result> { + use bcder::{Oid, Tag}; + + cons.take_sequence(|cons| { + // OID: id-ct-rpkiCanonicalCacheRepresentation (1.2.840.113549.1.9.16.1.54) + let _oid = Oid::take_from(cons)?; + + // [0] EXPLICIT - the CCR content + cons.take_constructed_if(Tag::CTX_0, |cons| { + cons.take_sequence(|cons| { + // hashAlg: DigestAlgorithmIdentifier (SEQUENCE { OID }) + cons.take_sequence(|cons| { + let _hash_oid = Oid::take_from(cons)?; + // Some implementations include NULL parameters + cons.take_opt_null()?; + Ok(()) + })?; + + // producedAt: GeneralizedTime - skip + cons.take_value(|_tag, content| { + content.as_primitive()?.skip_all()?; + Ok(()) + })?; + + let mut roas = Vec::new(); + let mut aspas = Vec::new(); + + // [1] ManifestState OPTIONAL - skip + cons.take_opt_constructed_if(Tag::CTX_1, |cons| { + // Skip all manifest content + cons.capture_all()?; + Ok(()) + })?; + + // [2] ROAPayloadState OPTIONAL - parse + if let Some(roa_data) = + cons.take_opt_constructed_if(Tag::CTX_2, |cons| parse_roa_payload_state(cons))? + { + roas = roa_data; + } + + // [3] ASPAPayloadState OPTIONAL - parse + if let Some(aspa_data) = + cons.take_opt_constructed_if(Tag::CTX_3, |cons| parse_aspa_payload_state(cons))? + { + aspas = aspa_data; + } + + // Skip remaining optional fields ([4] TAS, [5] RKS, ...) + // by consuming all remaining content + cons.capture_all()?; + + Ok(RpkiSpoolsData { roas, aspas }) + }) + }) + }) +} + +/// Parse ROAPayloadState: +/// ```text +/// ROAPayloadState ::= SEQUENCE { +/// rps SEQUENCE OF ROAPayloadSet, +/// hash Digest } +/// ``` +fn parse_roa_payload_state( + cons: &mut bcder::decode::Constructed, +) -> std::result::Result, bcder::decode::DecodeError> { + // ROAPayloadState is a SEQUENCE + cons.take_sequence(|cons| { + // rps: SEQUENCE OF ROAPayloadSet + let roas = cons.take_sequence(|cons| { + let mut all_roas = Vec::new(); + // Each ROAPayloadSet + while let Some(set_roas) = cons.take_opt_sequence(|cons| parse_roa_payload_set(cons))? { + all_roas.extend(set_roas); + } + Ok(all_roas) + })?; + + // hash: Digest (OCTET STRING) - skip + cons.capture_all()?; + + Ok(roas) + }) +} + +/// Parse ROAPayloadSet: +/// ```text +/// ROAPayloadSet ::= SEQUENCE { +/// asID ASID, +/// ipAddrBlocks SEQUENCE (SIZE(1..2)) OF ROAIPAddressFamily } +/// ``` +fn parse_roa_payload_set( + cons: &mut bcder::decode::Constructed, +) -> std::result::Result, bcder::decode::DecodeError> { + let asn = cons.take_u32()?; + + // ipAddrBlocks: SEQUENCE (SIZE(1..2)) OF ROAIPAddressFamily + let roas = cons.take_sequence(|cons| { + let mut roas = Vec::new(); + // Each ROAIPAddressFamily + while let Some(family_roas) = + cons.take_opt_sequence(|cons| parse_roa_ip_address_family(cons, asn))? + { + roas.extend(family_roas); + } + Ok(roas) + })?; + + Ok(roas) +} + +/// Parse ROAIPAddressFamily (from RFC 9582): +/// ```text +/// ROAIPAddressFamily ::= SEQUENCE { +/// addressFamily OCTET STRING (SIZE (2..3)), +/// addresses SEQUENCE OF ROAIPAddress } +/// +/// ROAIPAddress ::= SEQUENCE { +/// address IPAddress, -- BIT STRING +/// maxLength INTEGER OPTIONAL } +/// ``` +fn parse_roa_ip_address_family( + cons: &mut bcder::decode::Constructed, + asn: u32, +) -> std::result::Result, bcder::decode::DecodeError> { + // addressFamily: OCTET STRING (2 bytes for AFI) + let family_bytes = bcder::OctetString::take_from(cons)?; + let family_slice = family_bytes.to_bytes(); + + let is_ipv4 = match family_slice.as_ref() { + [0, 1] => true, // IPv4 + [0, 2] => false, // IPv6 + [0, 1, _] => true, // IPv4 with SAFI + [0, 2, _] => false, // IPv6 with SAFI + _ => { + return Err(cons.content_err("unknown address family in ROAIPAddressFamily")); + } + }; + + // addresses: SEQUENCE OF ROAIPAddress + let roas = cons.take_sequence(|cons| { + let mut roas = Vec::new(); + while let Some(roa) = + cons.take_opt_sequence(|cons| parse_roa_ip_address(cons, asn, is_ipv4))? + { + roas.push(roa); + } + Ok(roas) + })?; + + Ok(roas) +} + +/// Parse a single ROAIPAddress: +/// ```text +/// ROAIPAddress ::= SEQUENCE { +/// address IPAddress, -- BIT STRING representing prefix +/// maxLength INTEGER OPTIONAL } +/// ``` +fn parse_roa_ip_address( + cons: &mut bcder::decode::Constructed, + asn: u32, + is_ipv4: bool, +) -> std::result::Result> { + // address: BIT STRING + let bit_string = bcder::BitString::take_from(cons)?; + let unused_bits = bit_string.unused(); + let octets = bit_string.octet_bytes(); + + let prefix_len = (octets.len() as u8) * 8 - unused_bits; + + let ip_addr: IpAddr = if is_ipv4 { + let mut addr_bytes = [0u8; 4]; + for (i, &b) in octets.iter().enumerate().take(4) { + addr_bytes[i] = b; + } + IpAddr::V4(Ipv4Addr::from(addr_bytes)) + } else { + let mut addr_bytes = [0u8; 16]; + for (i, &b) in octets.iter().enumerate().take(16) { + addr_bytes[i] = b; + } + IpAddr::V6(Ipv6Addr::from(addr_bytes)) + }; + + // maxLength: INTEGER OPTIONAL + let max_length = cons.take_opt_u8()?.unwrap_or(prefix_len); + + let prefix_str = format!("{}/{}", ip_addr, prefix_len); + let prefix: IpNet = prefix_str + .parse() + .map_err(|_| cons.content_err(format!("invalid IP prefix: {}", prefix_str)))?; + + Ok(Roa { + prefix, + asn, + max_length, + rir: None, + not_before: None, + not_after: None, + }) +} + +/// Parse ASPAPayloadState: +/// ```text +/// ASPAPayloadState ::= SEQUENCE { +/// aps SEQUENCE OF ASPAPayloadSet, +/// hash Digest } +/// +/// ASPAPayloadSet ::= SEQUENCE { +/// customerASID ASID, +/// providers SEQUENCE (SIZE(1..MAX)) OF ASID } +/// ``` +fn parse_aspa_payload_state( + cons: &mut bcder::decode::Constructed, +) -> std::result::Result, bcder::decode::DecodeError> { + cons.take_sequence(|cons| { + // aps: SEQUENCE OF ASPAPayloadSet + let aspas = cons.take_sequence(|cons| { + let mut all_aspas = Vec::new(); + while let Some(aspa) = cons.take_opt_sequence(|cons| { + let customer_asn = cons.take_u32()?; + let providers = cons.take_sequence(|cons| { + let mut provs = Vec::new(); + while let Some(p) = cons.take_opt_u32()? { + provs.push(p); + } + Ok(provs) + })?; + Ok(Aspa { + customer_asn, + providers, + expires: None, + }) + })? { + all_aspas.push(aspa); + } + Ok(all_aspas) + })?; + + // hash: Digest (OCTET STRING) - skip + cons.capture_all()?; + + Ok(aspas) + }) +} + +// ============================================================================ +// Streaming tar.zst and extracting CCR files +// ============================================================================ + +/// Parse an RPKISPOOL archive from a URL, extracting the first CCR file's VRPs and VAPs. +/// +/// This streams the `.tar.zst` archive and parses the first CCR file found, +/// which contains all validated ROA and ASPA payloads from one vantage point snapshot. +pub fn parse_rpkispools_archive(url: &str) -> Result { + info!("streaming RPKISPOOL archive: {}", url); + + let reader = oneio::get_reader_raw(url).map_err(|e| { + crate::BgpkitCommonsError::data_source_error( + "RPKISPOOL", + format!("Failed to fetch {}: {}", url, e), + ) + })?; + + // Decompress zstd stream + let decoder = zstd::Decoder::new(reader).map_err(|e| { + crate::BgpkitCommonsError::data_source_error( + "RPKISPOOL", + format!("Failed to create zstd decoder: {}", e), + ) + })?; + + // Read tar entries + let mut archive = tar::Archive::new(decoder); + let entries = archive.entries().map_err(|e| { + crate::BgpkitCommonsError::data_source_error( + "RPKISPOOL", + format!("Failed to read tar entries: {}", e), + ) + })?; + + for entry_result in entries { + let mut entry = match entry_result { + Ok(e) => e, + Err(_) => continue, + }; + + let path = match entry.path() { + Ok(p) => p.to_string_lossy().to_string(), + Err(_) => continue, + }; + + // Find the first .ccr file + if path.ends_with(".ccr") { + info!("parsing CCR file: {} ({} bytes)", path, entry.size()); + let mut ccr_data = Vec::with_capacity(entry.size() as usize); + entry.read_to_end(&mut ccr_data).map_err(|e| { + crate::BgpkitCommonsError::data_source_error( + "RPKISPOOL", + format!("Failed to read CCR entry {}: {}", path, e), + ) + })?; + + return parse_ccr(&ccr_data); + } + } + + Err(crate::BgpkitCommonsError::data_source_error( + "RPKISPOOL", + format!("No CCR file found in archive: {}", url), + )) +} + +// ============================================================================ +// RpkiTrie integration +// ============================================================================ + +impl RpkiTrie { + /// Load RPKI data from an RPKISPOOL archive for a specific date. + /// + /// This downloads the RPKISPOOL archive for the given date and parses + /// the first CCR file to extract VRPs and ASPAs. + pub fn from_rpkispools(collector: RpkiSpoolsCollector, date: NaiveDate) -> Result { + let url = rpkispool_url(collector, date); + info!( + "loading RPKISPOOL data from {} for date {}", + collector, date + ); + Self::from_rpkispools_url(&url, Some(date)) + } + + /// Load RPKI data from a specific RPKISPOOL archive URL. + pub fn from_rpkispools_url(url: &str, date: Option) -> Result { + let data = parse_rpkispools_archive(url)?; + let mut trie = RpkiTrie::new(date); + trie.insert_roas(data.roas); + for aspa in data.aspas { + if !trie + .aspas + .iter() + .any(|a| a.customer_asn == aspa.customer_asn) + { + trie.aspas.push(aspa); + } + } + Ok(trie) + } +} + +// ============================================================================ +// Tests +// ============================================================================ + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_collector_urls() { + assert_eq!( + RpkiSpoolsCollector::SobornostNet.base_url(), + "https://josephine.sobornost.net/rpkidata/rpkispools" + ); + let date = NaiveDate::from_ymd_opt(2026, 3, 20).unwrap(); + assert_eq!( + rpkispool_url(RpkiSpoolsCollector::AttnJp, date), + "https://dango.attn.jp/rpkidata/rpkispools/2026/03/20/20260320-rpkispool.tar.zst" + ); + assert_eq!( + initstate_url(RpkiSpoolsCollector::KerfuffleNet, date), + "https://rpkiviews.kerfuffle.net/rpkidata/rpkispools/2026/03/20/20260320-initstate.tar.zst" + ); + } + + #[test] + fn test_collector_from_str() { + assert_eq!( + RpkiSpoolsCollector::from_str("sobornost.net").unwrap(), + RpkiSpoolsCollector::SobornostNet + ); + assert_eq!( + RpkiSpoolsCollector::from_str("dango.attn.jp").unwrap(), + RpkiSpoolsCollector::AttnJp + ); + } + + #[test] + fn test_default_collector() { + assert_eq!( + RpkiSpoolsCollector::default(), + RpkiSpoolsCollector::SobornostNet + ); + } + + #[test] + #[ignore] // Requires network access + fn test_from_rpkispools() { + let date = NaiveDate::from_ymd_opt(2026, 3, 20).unwrap(); + let trie = RpkiTrie::from_rpkispools(RpkiSpoolsCollector::AttnJp, date) + .expect("failed to load RPKISPOOL data"); + + let total_roas: usize = trie.trie.iter().map(|(_, roas)| roas.len()).sum(); + println!("loaded {} ROAs from RPKISPOOL for {}", total_roas, date); + println!("Loaded {} ASPAs", trie.aspas.len()); + + assert!(total_roas > 0, "Should have loaded some ROAs"); + assert!(!trie.aspas.is_empty(), "Should have loaded some ASPAs"); + } + + #[test] + #[ignore] // Requires network access + fn test_parse_ccr_from_stream() { + let url = "https://dango.attn.jp/rpkidata/rpkispools/2026/03/20/20260320-rpkispool.tar.zst"; + let data = parse_rpkispools_archive(url).expect("failed to parse RPKISPOOL archive"); + + println!("Parsed {} ROAs", data.roas.len()); + println!("Parsed {} ASPAs", data.aspas.len()); + + // Print some sample ROAs + for roa in data.roas.iter().take(5) { + println!( + " ROA: {}/{} AS{} max_length={}", + roa.prefix, + roa.prefix.prefix_len(), + roa.asn, + roa.max_length + ); + } + + // Print some sample ASPAs + for aspa in data.aspas.iter().take(5) { + println!( + " ASPA: AS{} providers={:?}", + aspa.customer_asn, aspa.providers + ); + } + + assert!(data.roas.len() > 10000, "Expected many ROAs from CCR"); + assert!(!data.aspas.is_empty(), "Expected some ASPAs from CCR"); + } +} From b6a503e21c51e60a8cb193481cc6d166e9fcc954 Mon Sep 17 00:00:00 2001 From: Mingwei Zhang Date: Sat, 21 Mar 2026 09:53:51 -0700 Subject: [PATCH 2/2] feat: add rpki_lookup_aspa() method and extend rpkispools example Add BgpkitCommons::rpki_lookup_aspa(customer_asn) to look up ASPA records by customer ASN. Extend the rpkispools example to demonstrate ASPA lookup for AS400644. --- examples/rpkispools.rs | 14 ++++++++++++++ src/lib.rs | 4 ++-- src/rpki/mod.rs | 21 +++++++++++++++++++++ 3 files changed, 37 insertions(+), 2 deletions(-) diff --git a/examples/rpkispools.rs b/examples/rpkispools.rs index 6759765..48a0201 100644 --- a/examples/rpkispools.rs +++ b/examples/rpkispools.rs @@ -43,4 +43,18 @@ fn main() { asn, commons.rpki_validate(asn, prefix).unwrap() ); + + // Look up ASPA for AS400644 + let customer_asn = 400644; + println!("\nASPA for AS{}:", customer_asn); + match commons.rpki_lookup_aspa(customer_asn) { + Ok(Some(aspa)) => { + println!( + " customer AS{} -> providers: {:?}", + aspa.customer_asn, aspa.providers + ); + } + Ok(None) => println!(" (no ASPA found)"), + Err(e) => println!(" Error: {}", e), + } } diff --git a/src/lib.rs b/src/lib.rs index 6a674c9..c42ca43 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -75,10 +75,10 @@ //! //! ### [`rpki`] — RPKI Validation //! -//! Feature: `rpki` | Sources: Cloudflare (real-time), RIPE NCC historical, RPKIviews historical +//! Feature: `rpki` | Sources: Cloudflare (real-time), RIPE NCC historical, RPKIviews historical, RPKISPOOL historical //! //! - Load: `load_rpki(optional_date)`, `load_rpki_historical(date, source)`, `load_rpki_from_files(urls, source, date)` -//! - Access: `rpki_validate(asn, prefix)`, `rpki_validate_check_expiry(asn, prefix, timestamp)`, `rpki_lookup_by_prefix(prefix)` +//! - Access: `rpki_validate(asn, prefix)`, `rpki_validate_check_expiry(asn, prefix, timestamp)`, `rpki_lookup_by_prefix(prefix)`, `rpki_lookup_aspa(customer_asn)` //! - Route Origin Authorization (ROA) and ASPA validation, supports real-time and historical sources //! //! ## Examples diff --git a/src/rpki/mod.rs b/src/rpki/mod.rs index 544e8af..a1d7012 100644 --- a/src/rpki/mod.rs +++ b/src/rpki/mod.rs @@ -636,6 +636,27 @@ impl BgpkitCommons { .unwrap() .validate_check_expiry(&prefix, asn, check_time)) } + + /// Look up ASPA records for a given customer ASN. + /// + /// Returns the ASPA record if one exists for the given customer ASN, + /// or `None` if no ASPA is registered. + pub fn rpki_lookup_aspa(&self, customer_asn: u32) -> Result> { + if self.rpki_trie.is_none() { + return Err(BgpkitCommonsError::module_not_loaded( + modules::RPKI, + load_methods::LOAD_RPKI, + )); + } + Ok(self + .rpki_trie + .as_ref() + .unwrap() + .aspas + .iter() + .find(|a| a.customer_asn == customer_asn) + .cloned()) + } } // ============================================================================