From bde31f493f49cc6efc09c84498fd6d932c2f2f3a Mon Sep 17 00:00:00 2001 From: Yureka Date: Fri, 17 Jan 2025 01:32:51 +0100 Subject: [PATCH 1/9] table_impl: add subscription mechanism --- src/table_impl.rs | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/src/table_impl.rs b/src/table_impl.rs index 9fea601..48eaeb3 100644 --- a/src/table_impl.rs +++ b/src/table_impl.rs @@ -4,10 +4,20 @@ use ipnet::IpNet; use nibbletree::Node; use std::sync::Arc; use std::sync::Mutex; +use std::sync::Weak; + +pub type Subscriber = dyn Fn(IpNet, u32, Action) + Send + Sync; + +#[derive(Clone, Debug)] +pub enum Action { + Update(Arc), + Withdraw, +} #[derive(Clone)] pub struct InMemoryTable { pub table: Arc)>>>>, + subscribers: Arc>>>, caches: Arc>, } @@ -44,12 +54,26 @@ impl InMemoryTable { pub fn new(caches: Arc>) -> Self { Self { table: Default::default(), + subscribers: Default::default(), caches, } } + pub fn subscribe(&self, cb: Weak) { + self.subscribers.lock().unwrap().push(cb); + } + pub async fn update_route(&self, path_id: PathId, net: IpNet, route: RouteAttrs) { let compressed = self.caches.lock().unwrap().compress_route_attrs(route); + for subscriber in self + .subscribers + .lock() + .unwrap() + .iter() + .filter_map(Weak::upgrade) + { + (*subscriber)(net, path_id, Action::Update(compressed.clone())); + } let mut table = self.table.lock().unwrap(); @@ -70,6 +94,16 @@ impl InMemoryTable { } pub async fn withdraw_route(&self, path_id: PathId, net: IpNet) { + for subscriber in self + .subscribers + .lock() + .unwrap() + .iter() + .filter_map(Weak::upgrade) + { + (*subscriber)(net, path_id, Action::Withdraw); + } + let mut table = self.table.lock().unwrap(); let is_empty = match table.exact_mut(&net) { From 180aa1f6bcf4cc6aa663dddf6ff455d42ec80596 Mon Sep 17 00:00:00 2001 From: Yureka Date: Fri, 17 Jan 2025 01:34:21 +0100 Subject: [PATCH 2/9] in-memory table can be sync --- src/store_impl.rs | 4 ++-- src/table_impl.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/store_impl.rs b/src/store_impl.rs index e1ccf75..e33c303 100644 --- a/src/store_impl.rs +++ b/src/store_impl.rs @@ -107,13 +107,13 @@ impl Store for InMemoryStore { route: RouteAttrs, ) { let table = self.get_table(table); - table.update_route(path_id, net, route).await; + table.update_route(path_id, net, route); } #[autometrics::autometrics] async fn withdraw_route(&self, path_id: PathId, net: IpNet, table: TableSelector) { let table = self.get_table(table); - table.withdraw_route(path_id, net).await; + table.withdraw_route(path_id, net); } fn get_routes(&self, query: Query) -> Pin + Send>> { diff --git a/src/table_impl.rs b/src/table_impl.rs index 48eaeb3..6484951 100644 --- a/src/table_impl.rs +++ b/src/table_impl.rs @@ -63,7 +63,7 @@ impl InMemoryTable { self.subscribers.lock().unwrap().push(cb); } - pub async fn update_route(&self, path_id: PathId, net: IpNet, route: RouteAttrs) { + pub fn update_route(&self, path_id: PathId, net: IpNet, route: RouteAttrs) { let compressed = self.caches.lock().unwrap().compress_route_attrs(route); for subscriber in self .subscribers @@ -93,7 +93,7 @@ impl InMemoryTable { } } - pub async fn withdraw_route(&self, path_id: PathId, net: IpNet) { + pub fn withdraw_route(&self, path_id: PathId, net: IpNet) { for subscriber in self .subscribers .lock() From db22e81b529185473f121a35c6e7ba2407032ecf Mon Sep 17 00:00:00 2001 From: Yureka Date: Fri, 17 Jan 2025 01:35:31 +0100 Subject: [PATCH 3/9] separate update_route_compressed function --- src/table_impl.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/table_impl.rs b/src/table_impl.rs index 6484951..78f433f 100644 --- a/src/table_impl.rs +++ b/src/table_impl.rs @@ -63,8 +63,12 @@ impl InMemoryTable { self.subscribers.lock().unwrap().push(cb); } - pub fn update_route(&self, path_id: PathId, net: IpNet, route: RouteAttrs) { - let compressed = self.caches.lock().unwrap().compress_route_attrs(route); + pub fn update_route_compressed( + &self, + path_id: PathId, + net: IpNet, + compressed: Arc, + ) { for subscriber in self .subscribers .lock() @@ -92,6 +96,10 @@ impl InMemoryTable { table.insert(&net, insert); } } + pub fn update_route(&self, path_id: PathId, net: IpNet, route: RouteAttrs) { + let compressed = self.caches.lock().unwrap().compress_route_attrs(route); + self.update_route_compressed(path_id, net, compressed) + } pub fn withdraw_route(&self, path_id: PathId, net: IpNet) { for subscriber in self From e0238fc8793746bed6a0d18073f9b864cd00bfb7 Mon Sep 17 00:00:00 2001 From: Yureka Date: Fri, 17 Jan 2025 01:40:45 +0100 Subject: [PATCH 4/9] make tables generic and allow storing Actions --- src/table_impl.rs | 52 +++++++++++++++++++++++++++++++---------------- 1 file changed, 35 insertions(+), 17 deletions(-) diff --git a/src/table_impl.rs b/src/table_impl.rs index 78f433f..24d3d29 100644 --- a/src/table_impl.rs +++ b/src/table_impl.rs @@ -6,18 +6,40 @@ use std::sync::Arc; use std::sync::Mutex; use std::sync::Weak; -pub type Subscriber = dyn Fn(IpNet, u32, Action) + Send + Sync; +pub type Subscriber = dyn Fn(IpNet, u32, Action) + Send + Sync; -#[derive(Clone, Debug)] -pub enum Action { - Update(Arc), +#[derive(Clone, Debug, PartialEq)] +pub enum Action { + Update(T), Withdraw, } +pub trait Compressable { + type Compressed; + fn compress(self, caches: &Arc>) -> Self::Compressed; +} + +impl Compressable for RouteAttrs { + type Compressed = Arc; + fn compress(self, caches: &Arc>) -> Self::Compressed { + caches.lock().unwrap().compress_route_attrs(self) + } +} + +impl Compressable for Action { + type Compressed = Action; + fn compress(self, caches: &Arc>) -> Self::Compressed { + match self { + Action::Withdraw => Action::Withdraw, + Action::Update(attrs) => Action::Update(attrs.compress(caches)), + } + } +} + #[derive(Clone)] -pub struct InMemoryTable { - pub table: Arc)>>>>, - subscribers: Arc>>>, +pub struct InMemoryTable { + pub table: Arc>>>, + subscribers: Arc>>>>, caches: Arc>, } @@ -50,7 +72,7 @@ impl NodeExt for Node)>> { } } -impl InMemoryTable { +impl> InMemoryTable { pub fn new(caches: Arc>) -> Self { Self { table: Default::default(), @@ -59,16 +81,11 @@ impl InMemoryTable { } } - pub fn subscribe(&self, cb: Weak) { + pub fn subscribe(&self, cb: Weak>) { self.subscribers.lock().unwrap().push(cb); } - pub fn update_route_compressed( - &self, - path_id: PathId, - net: IpNet, - compressed: Arc, - ) { + pub fn update_route_compressed(&self, path_id: PathId, net: IpNet, compressed: C) { for subscriber in self .subscribers .lock() @@ -96,8 +113,9 @@ impl InMemoryTable { table.insert(&net, insert); } } - pub fn update_route(&self, path_id: PathId, net: IpNet, route: RouteAttrs) { - let compressed = self.caches.lock().unwrap().compress_route_attrs(route); + + pub fn update_route(&self, path_id: PathId, net: IpNet, route: T) { + let compressed = route.compress(&self.caches); self.update_route_compressed(path_id, net, compressed) } From 8e1a401df14d0a86c8f528d68a93adec2cfd302f Mon Sep 17 00:00:00 2001 From: Yureka Date: Sun, 19 Jan 2025 18:52:35 +0100 Subject: [PATCH 5/9] InMemoryTableState --- src/store_impl.rs | 5 +- src/table_impl.rs | 128 ++++++++++++++++++++++++++++++++-------------- 2 files changed, 93 insertions(+), 40 deletions(-) diff --git a/src/store_impl.rs b/src/store_impl.rs index e33c303..8c400fd 100644 --- a/src/store_impl.rs +++ b/src/store_impl.rs @@ -166,8 +166,9 @@ impl Store for InMemoryStore { tables .into_par_iter() .flat_map(move |(table_sel, table)| { - let table = table.table.lock().unwrap(); - table + let state = table.state.lock().unwrap(); + state + .table .get_routes(Some(&query.net_query)) .map(move |(net, _path_id, route)| { let table_sel = table_sel.clone(); diff --git a/src/table_impl.rs b/src/table_impl.rs index 24d3d29..ed421a9 100644 --- a/src/table_impl.rs +++ b/src/table_impl.rs @@ -36,9 +36,21 @@ impl Compressable for Action { } } +pub struct InMemoryTableState> { + pub table: Node>, +} + +impl Default for InMemoryTableState { + fn default() -> Self { + Self { + table: Default::default(), + } + } +} + #[derive(Clone)] pub struct InMemoryTable { - pub table: Arc>>>, + pub state: Arc>>, subscribers: Arc>>>>, caches: Arc>, } @@ -72,10 +84,80 @@ impl NodeExt for Node)>> { } } +impl std::iter::Extend<(IpNet, PathId, T)> for InMemoryTableState { + fn extend(&mut self, iter: I) + where + I: IntoIterator, + { + for (net, num, attrs) in iter { + self.update_route(num, net, attrs); + } + } +} +impl std::iter::FromIterator<(IpNet, PathId, T)> for InMemoryTableState { + fn from_iter(iter: I) -> Self + where + I: IntoIterator, + { + let mut table = Self::default(); + table.extend(iter); + table + } +} +impl std::iter::Extend<(IpNet, PathId, Action)> + for InMemoryTableState +{ + fn extend(&mut self, iter: I) + where + I: IntoIterator)>, + { + for (net, num, action) in iter { + match action { + Action::Update(attrs) => self.update_route(num, net, attrs), + Action::Withdraw => self.withdraw_route(num, net), + } + } + } +} + +impl InMemoryTableState { + pub fn update_route(&mut self, path_id: PathId, net: IpNet, attrs: T) { + let mut new_insert = None; + let entry = self.table.exact_mut(&net).unwrap_or_else(|| { + new_insert = Some(Vec::new()); + new_insert.as_mut().unwrap() + }); + + match entry.binary_search_by_key(&path_id, |(k, _)| *k) { + Ok(index) => drop(std::mem::replace(&mut entry[index], (path_id, attrs))), + Err(index) => entry.insert(index, (path_id, attrs)), + }; + + if let Some(insert) = new_insert { + self.table.insert(&net, insert); + } + } + + pub fn withdraw_route(&mut self, path_id: PathId, net: IpNet) { + let is_empty = match self.table.exact_mut(&net) { + Some(entry) => { + if let Ok(index) = entry.binary_search_by_key(&path_id, |(k, _)| *k) { + entry.remove(index); + } + entry.is_empty() + } + None => return, + }; + if is_empty { + self.table.remove(&net); + } + } +} + impl> InMemoryTable { pub fn new(caches: Arc>) -> Self { Self { - table: Default::default(), + state: Default::default(), subscribers: Default::default(), caches, } @@ -85,7 +167,8 @@ impl> InMemoryTable { self.subscribers.lock().unwrap().push(cb); } - pub fn update_route_compressed(&self, path_id: PathId, net: IpNet, compressed: C) { + pub fn update_route(&self, path_id: PathId, net: IpNet, route: T) { + let compressed = route.compress(&self.caches); for subscriber in self .subscribers .lock() @@ -96,27 +179,8 @@ impl> InMemoryTable { (*subscriber)(net, path_id, Action::Update(compressed.clone())); } - let mut table = self.table.lock().unwrap(); - - let mut new_insert = None; - let entry = table.exact_mut(&net).unwrap_or_else(|| { - new_insert = Some(Vec::new()); - new_insert.as_mut().unwrap() - }); - - match entry.binary_search_by_key(&path_id, |(k, _)| *k) { - Ok(index) => drop(std::mem::replace(&mut entry[index], (path_id, compressed))), - Err(index) => entry.insert(index, (path_id, compressed)), - }; - - if let Some(insert) = new_insert { - table.insert(&net, insert); - } - } - - pub fn update_route(&self, path_id: PathId, net: IpNet, route: T) { - let compressed = route.compress(&self.caches); - self.update_route_compressed(path_id, net, compressed) + let mut state = self.state.lock().unwrap(); + state.update_route(path_id, net, compressed); } pub fn withdraw_route(&self, path_id: PathId, net: IpNet) { @@ -130,19 +194,7 @@ impl> InMemoryTable { (*subscriber)(net, path_id, Action::Withdraw); } - let mut table = self.table.lock().unwrap(); - - let is_empty = match table.exact_mut(&net) { - Some(entry) => { - if let Ok(index) = entry.binary_search_by_key(&path_id, |(k, _)| *k) { - entry.remove(index); - } - entry.is_empty() - } - None => return, - }; - if is_empty { - table.remove(&net); - } + let mut state = self.state.lock().unwrap(); + state.withdraw_route(path_id, net); } } From 42fef2a03c596047d34bcb94b3dffb095a5e9b30 Mon Sep 17 00:00:00 2001 From: Yureka Date: Fri, 17 Jan 2025 21:38:36 +0100 Subject: [PATCH 6/9] add table_stream module --- Cargo.lock | 150 +++++++++++++++++++++++++------- Cargo.toml | 2 + src/lib.rs | 1 + src/table_impl.rs | 2 +- src/table_stream.rs | 206 ++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 327 insertions(+), 34 deletions(-) create mode 100644 src/table_stream.rs diff --git a/Cargo.lock b/Cargo.lock index 20cc504..62a21de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -91,6 +91,28 @@ version = "1.0.79" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "080e9890a082662b09c1ad45f567faeeb47f22b5fb23895fbe1e651e718e25ca" +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.96", +] + [[package]] name = "async-trait" version = "0.1.77" @@ -99,7 +121,7 @@ checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.96", ] [[package]] @@ -356,7 +378,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.96", ] [[package]] @@ -393,6 +415,7 @@ name = "fernglas" version = "0.2.1" dependencies = [ "anyhow", + "async-stream", "async-trait", "autometrics", "axum", @@ -410,6 +433,7 @@ dependencies = [ "nibbletree", "rayon", "regex", + "rstest", "serde", "serde_json", "tokio", @@ -494,7 +518,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.96", ] [[package]] @@ -541,6 +565,12 @@ version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" +[[package]] +name = "glob" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2" + [[package]] name = "h2" version = "0.4.2" @@ -553,7 +583,7 @@ dependencies = [ "futures-sink", "futures-util", "http", - "indexmap 2.2.2", + "indexmap 2.7.1", "slab", "tokio", "tokio-util", @@ -575,6 +605,12 @@ version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +[[package]] +name = "hashbrown" +version = "0.15.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" + [[package]] name = "heck" version = "0.4.1" @@ -781,12 +817,12 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.2.2" +version = "2.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "824b2ae422412366ba479e8111fd301f7b5faece8149317bb81925979a53f520" +checksum = "8c9c992b02b5b4c94ea26e32fe5bccb7aa7d9f390ab5c1221ff895bc7ea8b652" dependencies = [ "equivalent", - "hashbrown 0.14.3", + "hashbrown 0.15.2", ] [[package]] @@ -1144,7 +1180,7 @@ dependencies = [ "proc-macro2", "proc-macro2-diagnostics", "quote", - "syn 2.0.48", + "syn 2.0.96", ] [[package]] @@ -1170,14 +1206,14 @@ checksum = "266c042b60c9c76b8d53061e52b2e0d1116abc57cefc8c5cd671619a56ac3690" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.96", ] [[package]] name = "pin-project-lite" -version = "0.2.13" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" +checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" [[package]] name = "pin-utils" @@ -1208,9 +1244,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro2" -version = "1.0.78" +version = "1.0.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae" +checksum = "60946a68e5f9d28b0dc1c21bb8a97ee7d018a8b322fa57838ba31cc878e22d99" dependencies = [ "unicode-ident", ] @@ -1223,7 +1259,7 @@ checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.96", "version_check", "yansi", ] @@ -1273,9 +1309,9 @@ checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" [[package]] name = "quote" -version = "1.0.35" +version = "1.0.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" +checksum = "0e4dccaaaf89514f546c693ddc140f729f958c247918a13380cccc6078391acc" dependencies = [ "proc-macro2", ] @@ -1356,9 +1392,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.10.3" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15" +checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" dependencies = [ "aho-corasick", "memchr", @@ -1368,9 +1404,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.5" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5bb987efffd3c6d0d8f5f89510bb458559eab11e4f869acb20bf845e016259cd" +checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" dependencies = [ "aho-corasick", "memchr", @@ -1379,9 +1415,15 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.8.2" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" + +[[package]] +name = "relative-path" +version = "1.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" +checksum = "ba39f3699c378cd8970968dcbff9c43159ea4cfbd88d43c00b22f2ef10a435d2" [[package]] name = "resolv-conf" @@ -1393,12 +1435,48 @@ dependencies = [ "quick-error", ] +[[package]] +name = "rstest" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03e905296805ab93e13c1ec3a03f4b6c4f35e9498a3d5fa96dc626d22c03cd89" +dependencies = [ + "rstest_macros", + "rustc_version", +] + +[[package]] +name = "rstest_macros" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef0053bbffce09062bee4bcc499b0fbe7a57b879f1efe088d6d8d4c7adcdef9b" +dependencies = [ + "cfg-if", + "glob", + "proc-macro2", + "quote", + "regex", + "relative-path", + "rustc_version", + "syn 2.0.96", + "unicode-ident", +] + [[package]] name = "rustc-demangle" version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustc_version" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92" +dependencies = [ + "semver", +] + [[package]] name = "rustversion" version = "1.0.14" @@ -1417,6 +1495,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "semver" +version = "1.0.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f79dfe2d285b0488816f30e700a7438c5a73d816b5b7d3ac72fbc48b0d185e03" + [[package]] name = "serde" version = "1.0.196" @@ -1434,7 +1518,7 @@ checksum = "33c85360c95e7d137454dc81d9a4ed2b8efd8fbe19cee57357b32b9771fccb67" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.96", ] [[package]] @@ -1466,7 +1550,7 @@ version = "0.9.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adf8a49373e98a4c5f0ceb5d05aa7c648d75f63774981ed95b7c7443bbd50c6e" dependencies = [ - "indexmap 2.2.2", + "indexmap 2.7.1", "itoa", "ryu", "serde", @@ -1526,9 +1610,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.48" +version = "2.0.96" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f3531638e407dfc0814761abb7c00a5b54992b849452a0646b7f65c9f770f3f" +checksum = "d5d0adab1ae378d7f53bdebc67a39f1f151407ef230f0ce2883572f5d8985c80" dependencies = [ "proc-macro2", "quote", @@ -1570,7 +1654,7 @@ checksum = "fa0faa943b50f3db30a20aa7e265dbc66076993efed8463e8de414e5d06d3471" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.96", ] [[package]] @@ -1614,7 +1698,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.96", ] [[package]] @@ -1688,7 +1772,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.96", ] [[package]] @@ -1726,9 +1810,9 @@ checksum = "08f95100a766bf4f8f28f90d77e0a5461bbdb219042e7679bebe79004fed8d75" [[package]] name = "unicode-ident" -version = "1.0.12" +version = "1.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" +checksum = "a210d160f08b701c8721ba1c726c11662f877ea6b7094007e1ca9a1041945034" [[package]] name = "unicode-normalization" @@ -1807,7 +1891,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.96", "wasm-bindgen-shared", ] @@ -1829,7 +1913,7 @@ checksum = "642f325be6301eb8107a83d12a8ac6c1e1c54345a7ef1a9261962dfefda09e66" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.96", "wasm-bindgen-backend", "wasm-bindgen-shared", ] diff --git a/Cargo.toml b/Cargo.toml index 33b16e5..26d0305 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,8 @@ hickory-resolver = "0.24" include_dir = { version = "0.7", optional = true } mime_guess = { version = "2.0", optional = true } figment = { version = "0.10", features = ["yaml", "env"] } +async-stream = "0.3.6" +rstest = { version = "0.24.0", default-features = false } [features] embed-static = ["include_dir", "mime_guess"] diff --git a/src/lib.rs b/src/lib.rs index 27939d6..c829180 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,6 +7,7 @@ pub mod route_distinguisher; pub mod store; pub mod store_impl; pub mod table_impl; +pub mod table_stream; use serde::Deserialize; use std::collections::HashMap; diff --git a/src/table_impl.rs b/src/table_impl.rs index ed421a9..86f2a38 100644 --- a/src/table_impl.rs +++ b/src/table_impl.rs @@ -52,7 +52,7 @@ impl Default for InMemoryTableState { pub struct InMemoryTable { pub state: Arc>>, subscribers: Arc>>>>, - caches: Arc>, + pub caches: Arc>, } pub trait NodeExt { diff --git a/src/table_stream.rs b/src/table_stream.rs new file mode 100644 index 0000000..3ea5019 --- /dev/null +++ b/src/table_stream.rs @@ -0,0 +1,206 @@ +use crate::store::PathId; +use crate::table_impl::*; +use ipnet::IpNet; +use std::sync::Arc; +use std::sync::Mutex; +use tokio::sync::Notify; + +struct TableStreamState { + buf: Vec<(IpNet, PathId, Action)>, + // Overflow table; used when the buffer is full + // IMPORTANT: only stores the latest Action for each prefix, + // so it will never get larger than a full table + overflow: InMemoryTableState>, +} +impl Default for TableStreamState { + fn default() -> Self { + Self { + buf: Vec::with_capacity(128), + overflow: Default::default(), + } + } +} +impl Extend<(IpNet, PathId, Action)> for TableStreamState { + fn extend(&mut self, i: I) + where + I: IntoIterator)>, + { + let mut iter = i.into_iter(); + while self.buf.len() < 128 { + let Some(next) = iter.next() else { + break; + }; + self.buf.push(next); + } + self.overflow.extend(iter) + } +} +impl FromIterator<(IpNet, PathId, Action)> for TableStreamState { + fn from_iter(iter: I) -> Self + where + I: IntoIterator)>, + { + let mut this: Self = Default::default(); + this.extend(iter); + this + } +} + +/// Provides a flow-controlled Stream monitoring all actions on a table, suitable for use with slow readers +pub fn table_stream( + table: &InMemoryTable, +) -> impl futures_util::Stream)> +where + C: Clone + Send + Sync + 'static + PartialEq + std::fmt::Debug, + T: Compressable + Clone + 'static, +{ + // Keeps track of the last state sent to the peer + let mut rib_out: InMemoryTableState = Default::default(); + + let state: TableStreamState = + // Copy the initial contents of the table + table.state.lock().unwrap().table.iter() + .flat_map(|(net, v)| v.iter().map(move |(num, attrs)| (net, *num, Action::Update(attrs.clone())))) + .collect(); + let state = Arc::new(Mutex::new(state)); + // Used to wake up the reader task when new routes have been inserted into the state + let notify = Arc::new(Notify::new()); + // Mark as dirty, since we added initial contents + notify.notify_one(); + + let subscriber: Arc> = { + let state = state.clone(); + let notify = notify.clone(); + Arc::new(move |net, num, action| { + state.lock().unwrap().extend(Some((net, num, action))); + notify.notify_one(); + }) + }; + table.subscribe(Arc::downgrade(&subscriber)); + async_stream::stream! { + let mut local_state: TableStreamState = Default::default(); + #[allow(unused)] + let subscriber = subscriber; + loop { + notify.notified().await; + std::mem::swap(&mut local_state, &mut *state.lock().unwrap()); + + if local_state.buf.len() >= 128 { + // We merge the buf into the overflow table, but so that items from the overflow + // table are preferred (since they arrived later) + for (net, num, action) in local_state.buf.drain(..).rev() { + let overflow_entry = local_state.overflow.table.exact(&net).and_then(|entry| { + match entry.binary_search_by_key(&num, |(k, _)| *k) { + Ok(index) => Some(entry[index].1.clone()), + Err(_) => None, + } + }); + if overflow_entry.is_none() { + local_state.overflow.update_route(num, net, action); + } + } + + for (net, v) in local_state.overflow.table.iter() { + for (num, action) in v { + // filter out routes that have flapped back to the state we last emitted to the stream + let rib_out_entry = rib_out.table.exact(&net).and_then(|entry| { + match entry.binary_search_by_key(&num, |(k, _)| k) { + Ok(index) => Some(entry[index].1.clone()), + Err(_) => None, + } + }); + let actual_action = match (action, rib_out_entry) { + (Action::Update(attrs), None) => Some(Action::Update(attrs.clone())), + (Action::Update(attrs), Some(existing_route)) if *attrs != existing_route => Some(Action::Update(attrs.clone())), + (Action::Withdraw, Some(_)) => Some(Action::Withdraw), + _ => None, + }; + rib_out.extend(actual_action.map(|action| (net, *num, action))); + yield (net, *num, action.clone()); + } + } + + drop(std::mem::take(&mut local_state.overflow)); + } else { + for i in local_state.buf.drain(..) { + rib_out.extend(Some(i.clone())); + yield i; + } + } + } + + // Keeps the subscription of table events alive as long as the Stream exists + #[allow(unreachable_code)] + drop(subscriber); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::compressed_attrs::decompress_route_attrs; + use crate::store::RouteAttrs; + use futures_util::pin_mut; + use futures_util::StreamExt; + use rstest::rstest; + use std::time::Duration; + + #[rstest] + #[tokio::test] + async fn test_table_stream( + #[values(false, true)] drain_in_between: bool, + #[values(1, 64, 127, 128, 129, 130, 255)] j1: u8, + #[values(1, 64, 127, 128, 129, 130, 255)] j2: u8, + #[values(1, 64, 127, 128, 129, 130, 255)] j3: u8, + ) { + let caches: Arc> = Default::default(); + let prefix = "0.0.0.0/0".parse().unwrap(); + let table_in: InMemoryTable = InMemoryTable::new(caches.clone()); + let table_stream = table_stream(&table_in); + pin_mut!(table_stream); + let mut sent_count: usize = 0; + for j in [&j1, &j2, &j3] { + for i in 0..*j { + table_in.update_route( + 0, + prefix, + RouteAttrs { + nexthop: Some([10u8, 0u8, 0u8, i].into()), + ..Default::default() + }, + ); + } + let mut table_out: InMemoryTableState = Default::default(); + sent_count += *j as usize; + if std::ptr::eq(j, &j3) || drain_in_between { + let sent_count = std::mem::take(&mut sent_count); + let expected_count = if sent_count < 128 { + // The messages fit into the buffer and are delivered as-is + sent_count + } else { + // The buffer has overflowed and all routes have been compressed into one + 1 + }; + for _ in 0..expected_count { + let r = tokio::time::timeout(Duration::from_secs(1), table_stream.next()) + .await + .unwrap() + .unwrap(); + table_out.extend(Some(r)); + } + + // The last value received is the last value inserted into table_in + assert_eq!( + table_out + .table + .exact(&prefix) + .into_iter() + .flat_map(|r| r.iter()) + .map(|(_num, r)| decompress_route_attrs(r).nexthop) + .collect::>(), + vec![Some([10u8, 0u8, 0u8, j - 1].into())] + ); + } + } + } +} From 8cdd825a4177e974dc8db2f056e2df47ae0d5688 Mon Sep 17 00:00:00 2001 From: Yureka Date: Mon, 20 Jan 2025 13:26:41 +0100 Subject: [PATCH 7/9] get rid of separate TableType use RouteState instead, which makes the TableSelector fully (de)serializable --- frontend/src/resultsView.js | 50 ++++++++++++------------------------- src/bgp_collector.rs | 6 ++--- src/bmp_collector.rs | 14 +++++------ src/store.rs | 43 ++++--------------------------- src/store_impl.rs | 2 +- 5 files changed, 30 insertions(+), 85 deletions(-) diff --git a/frontend/src/resultsView.js b/frontend/src/resultsView.js index a578da3..b43f666 100644 --- a/frontend/src/resultsView.js +++ b/frontend/src/resultsView.js @@ -88,56 +88,38 @@ const processResults = (results) => { const asnMap = Object.fromEntries(asnResults.map(r => [r.asn, r.asn_name ])); const communityMap = Object.fromEntries(communityResults.map(r => [r.community, r.community_description ])); - // stage 1, combine pre- and post-policy adj-in tables - // start out with PostPolicy - const preAndPostPolicy = {}; - const preAndPostPolicyKey = route => `${route.session_id.from_client}:${route.session_id.peer_address}:${route.net}`; + // stage 1, combine seen and accepted routes + // start out with Accepted + const seenAndAccepted = {}; + const seenAndAcceptedKey = route => `${route.session_id.from_client}:${route.session_id.peer_address}:${route.net}`; for (let route of routeResults) { - if (route.type === "PostPolicyAdjIn") { - preAndPostPolicy[preAndPostPolicyKey(route)] = route; + if (route.state === "Accepted") { + seenAndAccepted[seenAndAcceptedKey(route)] = route; } } // add routes which are _only_ in PrePolicy => have not been accepted for (let route of routeResults) { - if (route.type === "PrePolicyAdjIn") { - const key = preAndPostPolicyKey(route); - if (!preAndPostPolicy[key]) { - preAndPostPolicy[key] = route; - preAndPostPolicy[key].state = "Filtered"; + if (route.type === "Seen") { + const key = seenAndAcceptedKey(route); + if (!seenAndAccepted[key]) { + seenAndAccepted[key] = route; + seenAndAccepted[key].state = "Filtered"; } } } - // stage 2, combine adj-in and loc-rib + // stage 2, combine Seen/Accepted and Accepted/Active/Selected (add-paths export / loc-rib) const all = {}; const allKey = route => `${route.client_name}:${route.net}:${JSON.stringify(route.as_path)}:${JSON.stringify(route.large_communities)}:${route.nexthop}`; - for (let route of Object.values(preAndPostPolicy)) { + for (let route of Object.values(seenAndAccepted)) { const key = allKey(route); all[key] = route; } - for (let route of routeResults) { - if (route.table === "LocRib" && route.state === "Accepted") { - const key = allKey(route); - if (all[key]) - all[key].state = "Accepted"; - else - all[key] = route; - } - } - for (let route of routeResults) { - if (route.table === "LocRib" && route.state === "Active") { - const key = allKey(route); - if (all[key]) - all[key].state = "Active"; - else - all[key] = route; - } - } - for (let route of routeResults) { - if (route.table === "LocRib" && route.state === "Selected") { + for (let state of ["Accepted", "Active", "Selected"]) { + for (let route of routeResults.filter(route => route.state === state)) { const key = allKey(route); if (all[key]) - all[key].state = "Selected"; + all[key].state = state; else all[key] = route; } diff --git a/src/bgp_collector.rs b/src/bgp_collector.rs index 1e38dd5..b8508f5 100644 --- a/src/bgp_collector.rs +++ b/src/bgp_collector.rs @@ -1,6 +1,6 @@ use crate::bgpdumper::BgpDumper; use crate::route_distinguisher::RouteDistinguisher; -use crate::store::{Client, RouteState, SessionId, Store, TableSelector, TableType}; +use crate::store::{Client, RouteState, SessionId, Store, TableSelector}; use futures_util::future::join_all; use futures_util::TryStreamExt; use log::*; @@ -83,9 +83,7 @@ pub async fn run_peer( from_client: client_addr, peer_address: client_addr.ip(), }, - table_type: TableType::LocRib { - route_state: cfg.route_state, - }, + route_state: cfg.route_state, route_distinguisher: RouteDistinguisher::Default, }, update, diff --git a/src/bmp_collector.rs b/src/bmp_collector.rs index 788e575..da16d38 100644 --- a/src/bmp_collector.rs +++ b/src/bmp_collector.rs @@ -1,5 +1,5 @@ use crate::route_distinguisher::RouteDistinguisher; -use crate::store::{Client, RouteState, Session, SessionId, Store, TableSelector, TableType}; +use crate::store::{Client, RouteState, Session, SessionId, Store, TableSelector}; use bitvec::prelude::Msb0; use bitvec::view::BitView; use futures_util::future::join_all; @@ -20,12 +20,10 @@ fn table_selector_for_peer( client_addr: SocketAddr, peer: &BmpMessagePeerHeader, ) -> Option { - let table_type = match (peer.peertype, peer.flags.view_bits::()[1]) { - (0, false) | (1, false) => TableType::PrePolicyAdjIn, - (0, true) | (1, true) => TableType::PostPolicyAdjIn, - (3, _) => TableType::LocRib { - route_state: RouteState::Selected, - }, + let route_state = match (peer.peertype, peer.flags.view_bits::()[1]) { + (0, false) | (1, false) => RouteState::Seen, + (0, true) | (1, true) => RouteState::Accepted, + (3, _) => RouteState::Selected, _ => return None, }; @@ -37,7 +35,7 @@ fn table_selector_for_peer( Some(TableSelector { route_distinguisher, - table_type, + route_state, session_id: SessionId { from_client: client_addr, peer_address: peer.peeraddress, diff --git a/src/store.rs b/src/store.rs index 2f87568..e2bdfdf 100644 --- a/src/store.rs +++ b/src/store.rs @@ -49,41 +49,15 @@ pub enum RouteState { Selected, } -#[derive(Debug, PartialEq, Eq, Hash, Clone, Deserialize)] -#[serde(deny_unknown_fields)] -pub enum TableType { - PrePolicyAdjIn, - PostPolicyAdjIn, - LocRib { - #[serde(skip_serializing)] - route_state: RouteState, - }, -} - -impl Serialize for TableType { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - let table_type = match self { - TableType::PrePolicyAdjIn => "PrePolicyAdjIn", - TableType::PostPolicyAdjIn => "PostPolicyAdjIn", - TableType::LocRib { .. } => "LocRib", - }; - - serializer.serialize_str(table_type) - } -} - #[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct TableSelector { // None equal default Routing Instance #[serde(skip_serializing_if = "RouteDistinguisher::is_default")] + #[serde(default)] pub route_distinguisher: RouteDistinguisher, pub session_id: SessionId, - #[serde(rename = "type")] - pub table_type: TableType, + pub route_state: RouteState, } impl TableSelector { @@ -91,16 +65,9 @@ impl TableSelector { &self.session_id.from_client } pub fn session_id(&self) -> Option<&SessionId> { - match self.table_type { - TableType::LocRib { .. } => None, - _ => Some(&self.session_id), - } - } - pub fn route_state(&self) -> RouteState { - match self.table_type { - TableType::LocRib { route_state, .. } => route_state, - TableType::PostPolicyAdjIn => RouteState::Accepted, - TableType::PrePolicyAdjIn => RouteState::Seen, + match self.route_state { + RouteState::Seen | RouteState::Accepted => Some(&self.session_id), + _ => None, } } } diff --git a/src/store_impl.rs b/src/store_impl.rs index 8c400fd..32cfc5d 100644 --- a/src/store_impl.rs +++ b/src/store_impl.rs @@ -202,7 +202,7 @@ impl Store for InMemoryStore { }); Some(QueryResult { - state: table.route_state(), + state: table.route_state, net, table, attrs: decompress_route_attrs(&attrs), From fb2e7ea8bbefa57d5ed3954da4ff622060440f9d Mon Sep 17 00:00:00 2001 From: Yureka Date: Mon, 20 Jan 2025 15:59:10 +0100 Subject: [PATCH 8/9] don't expose client source port --- frontend/src/resultsView.js | 2 +- src/api.rs | 16 ++++++++++-- src/bgp_collector.rs | 13 +++++++--- src/bmp_collector.rs | 38 +++++++++++++++++++-------- src/main.rs | 22 ++++++++++------ src/store.rs | 24 ++++++++++------- src/store_impl.rs | 52 ++++++++++++++++++++++--------------- 7 files changed, 111 insertions(+), 56 deletions(-) diff --git a/frontend/src/resultsView.js b/frontend/src/resultsView.js index b43f666..2795874 100644 --- a/frontend/src/resultsView.js +++ b/frontend/src/resultsView.js @@ -91,7 +91,7 @@ const processResults = (results) => { // stage 1, combine seen and accepted routes // start out with Accepted const seenAndAccepted = {}; - const seenAndAcceptedKey = route => `${route.session_id.from_client}:${route.session_id.peer_address}:${route.net}`; + const seenAndAcceptedKey = route => `${route.session_id.from_client}:${route.session_id.listener}:${route.session_id.peer_address}:${route.net}`; for (let route of routeResults) { if (route.state === "Accepted") { seenAndAccepted[seenAndAcceptedKey(route)] = route; diff --git a/src/api.rs b/src/api.rs index 4ec9bf4..43ce8b8 100644 --- a/src/api.rs +++ b/src/api.rs @@ -340,7 +340,14 @@ async fn tables(State(AppState { store, .. }): State>) -> serde_json::to_string(&store.get_tables()).unwrap() } async fn routers(State(AppState { store, .. }): State>) -> impl IntoResponse { - serde_json::to_string(&store.get_routers()).unwrap() + serde_json::to_string( + &store + .get_routers() + .into_iter() + .map(|(k, v)| (format!("{},{}", k.0, k.1), v)) + .collect::>(), + ) + .unwrap() } async fn routing_instances( @@ -349,7 +356,12 @@ async fn routing_instances( let instances = store .get_routing_instances() .into_iter() - .map(|(k, v)| (k, v.into_iter().map(|v| (v, v)).collect::>())) + .map(|(k, v)| { + ( + format!("{},{}", k.0, k.1), + v.into_iter().map(|v| (v, v)).collect::>(), + ) + }) .collect::>(); serde_json::to_string(&instances).unwrap() diff --git a/src/bgp_collector.rs b/src/bgp_collector.rs index b8508f5..c1af926 100644 --- a/src/bgp_collector.rs +++ b/src/bgp_collector.rs @@ -21,6 +21,7 @@ pub async fn run_peer( store: impl Store, stream: TcpStream, client_addr: SocketAddr, + listener_name: String, ) -> anyhow::Result { let mut caps = vec![ BgpCapability::SafiIPv4u, @@ -61,7 +62,8 @@ pub async fn run_peer( .unwrap_or(client_addr.ip().to_string()); store .client_up( - client_addr, + client_addr.ip(), + listener_name.clone(), cfg.route_state, Client { client_name, @@ -80,7 +82,8 @@ pub async fn run_peer( .insert_bgp_update( TableSelector { session_id: SessionId { - from_client: client_addr, + from_client: client_addr.ip(), + listener: listener_name.clone(), peer_address: client_addr.ip(), }, route_state: cfg.route_state, @@ -111,6 +114,7 @@ pub struct BgpCollectorConfig { } pub async fn run( + name: String, cfg: BgpCollectorConfig, store: impl Store, mut shutdown: tokio::sync::watch::Receiver, @@ -125,10 +129,11 @@ pub async fn run( if let Some(peer_cfg) = cfg.peers.get(&client_addr.ip()).or(cfg.default_peer_config.as_ref()).cloned() { let store = store.clone(); + let name = name.clone(); let mut shutdown = shutdown.clone(); running_tasks.push(tokio::spawn(async move { tokio::select! { - res = run_peer(peer_cfg, store.clone(), io, client_addr) => { + res = run_peer(peer_cfg, store.clone(), io, client_addr, name.clone()) => { match res { Err(e) => warn!("disconnected {} {}", client_addr, e), Ok(notification) => info!("disconnected {} {:?}", client_addr, notification), @@ -137,7 +142,7 @@ pub async fn run( _ = shutdown.changed() => { } }; - store.client_down(client_addr).await; + store.client_down(client_addr.ip(), name.clone()).await; })); } else { info!("unexpected connection from {}", client_addr); diff --git a/src/bmp_collector.rs b/src/bmp_collector.rs index da16d38..0e488ed 100644 --- a/src/bmp_collector.rs +++ b/src/bmp_collector.rs @@ -18,6 +18,7 @@ use zettabgp::bmp::BmpMessage; fn table_selector_for_peer( client_addr: SocketAddr, + listener_name: String, peer: &BmpMessagePeerHeader, ) -> Option { let route_state = match (peer.peertype, peer.flags.view_bits::()[1]) { @@ -37,7 +38,8 @@ fn table_selector_for_peer( route_distinguisher, route_state, session_id: SessionId { - from_client: client_addr, + from_client: client_addr.ip(), + listener: listener_name, peer_address: peer.peeraddress, }, }) @@ -46,9 +48,10 @@ fn table_selector_for_peer( async fn process_route_monitoring( store: &impl Store, client_addr: SocketAddr, + listener_name: String, rm: BmpMessageRouteMonitoring, ) { - let session = match table_selector_for_peer(client_addr, &rm.peer) { + let session = match table_selector_for_peer(client_addr, listener_name, &rm.peer) { Some(session) => session, None => { trace!( @@ -65,6 +68,7 @@ async fn process_route_monitoring( pub fn run_peer( client_addr: SocketAddr, + listener_name: String, peer: BmpMessagePeerHeader, store: &impl Store, ) -> mpsc::Sender> { @@ -73,7 +77,7 @@ pub fn run_peer( tokio::task::spawn(async move { trace!("{} {:?}", client_addr, peer); - if let Some(session_id) = table_selector_for_peer(client_addr, &peer) + if let Some(session_id) = table_selector_for_peer(client_addr, listener_name.clone(), &peer) .and_then(|store| store.session_id().cloned()) { store.session_up(session_id, Session {}).await; @@ -82,7 +86,7 @@ pub fn run_peer( loop { match rx.recv().await { Some(Ok(rm)) => { - process_route_monitoring(&store, client_addr, rm).await; + process_route_monitoring(&store, client_addr, listener_name.clone(), rm).await; } Some(Err(down_msg)) => { trace!("{} {:?}", client_addr, down_msg); @@ -94,7 +98,7 @@ pub fn run_peer( } } } - if let Some(session_id) = table_selector_for_peer(client_addr, &peer) + if let Some(session_id) = table_selector_for_peer(client_addr, listener_name.clone(), &peer) .and_then(|store| store.session_id().cloned()) { store.session_down(session_id, None).await; @@ -107,6 +111,7 @@ pub async fn run_client( cfg: PeerConfig, io: TcpStream, client_addr: SocketAddr, + listener_name: String, store: &impl Store, ) -> anyhow::Result { let read = LengthDelimitedCodec::builder() @@ -151,7 +156,12 @@ pub async fn run_client( > = HashMap::new(); channels.insert( first_peer_up.peer.peeraddress, - run_peer(client_addr, first_peer_up.peer, store), + run_peer( + client_addr, + listener_name.clone(), + first_peer_up.peer, + store, + ), ); let client_name = cfg .name_override @@ -159,7 +169,8 @@ pub async fn run_client( .unwrap_or(client_addr.ip().to_string()); store .client_up( - client_addr, + client_addr.ip(), + listener_name.clone(), RouteState::Selected, Client { client_name, @@ -178,12 +189,15 @@ pub async fn run_client( BmpMessage::RouteMonitoring(rm) => { let channel = channels.entry(rm.peer.peeraddress).or_insert_with(|| { warn!("the bmp device {} sent a message for a nonexisting peer, we'll initialize the table now: {:?}", &client_addr, &rm); - run_peer(client_addr, rm.peer.clone(), store) + run_peer(client_addr, listener_name.clone(), rm.peer.clone(), store) }); channel.send(Ok(rm)).await.unwrap(); } BmpMessage::PeerUpNotification(n) => { - channels.insert(n.peer.peeraddress, run_peer(client_addr, n.peer, store)); + channels.insert( + n.peer.peeraddress, + run_peer(client_addr, listener_name.clone(), n.peer, store), + ); } BmpMessage::PeerDownNotification(n) => match channels.remove(&n.peer.peeraddress) { Some(channel) => channel.send(Err(n)).await.unwrap(), @@ -209,6 +223,7 @@ pub struct BmpCollectorConfig { } pub async fn run( + name: String, cfg: BmpCollectorConfig, store: impl Store, mut shutdown: tokio::sync::watch::Receiver, @@ -222,11 +237,12 @@ pub async fn run( info!("connected {:?}", client_addr); let store = store.clone(); + let name = name.clone(); let mut shutdown = shutdown.clone(); if let Some(peer_cfg) = cfg.peers.get(&client_addr.ip()).or(cfg.default_peer_config.as_ref()).cloned() { running_tasks.push(tokio::spawn(async move { tokio::select! { - res = run_client(peer_cfg, io, client_addr, &store) => { + res = run_client(peer_cfg, io, client_addr, name.clone(), &store) => { match res { Err(e) => warn!("disconnected {} {}", client_addr, e), Ok(notification) => info!("disconnected {} {:?}", client_addr, notification), @@ -235,7 +251,7 @@ pub async fn run( _ = shutdown.changed() => { } }; - store.client_down(client_addr).await; + store.client_down(client_addr.ip(), name.clone()).await; })); } else { info!("unexpected connection from {}", client_addr); diff --git a/src/main.rs b/src/main.rs index 69e22f3..3199c11 100644 --- a/src/main.rs +++ b/src/main.rs @@ -44,14 +44,20 @@ async fn main() -> anyhow::Result<()> { futures.extend( cfg.collectors - .into_values() - .map(|collector| match collector { - CollectorConfig::Bmp(cfg) => { - tokio::task::spawn(bmp_collector::run(cfg, store.clone(), shutdown_rx.clone())) - } - CollectorConfig::Bgp(cfg) => { - tokio::task::spawn(bgp_collector::run(cfg, store.clone(), shutdown_rx.clone())) - } + .into_iter() + .map(|(name, collector)| match collector { + CollectorConfig::Bmp(cfg) => tokio::task::spawn(bmp_collector::run( + name, + cfg, + store.clone(), + shutdown_rx.clone(), + )), + CollectorConfig::Bgp(cfg) => tokio::task::spawn(bgp_collector::run( + name, + cfg, + store.clone(), + shutdown_rx.clone(), + )), }), ); diff --git a/src/store.rs b/src/store.rs index e2bdfdf..5b89e29 100644 --- a/src/store.rs +++ b/src/store.rs @@ -3,7 +3,7 @@ use ipnet::{IpNet, Ipv4Net, Ipv6Net}; use log::*; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::net::{IpAddr, Ipv4Addr}; use std::pin::Pin; use zettabgp::prelude::{BgpAddrV4, BgpAddrV6}; @@ -33,7 +33,8 @@ pub struct RouteAttrs { #[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct SessionId { - pub from_client: SocketAddr, + pub from_client: IpAddr, + pub listener: String, pub peer_address: IpAddr, } @@ -61,8 +62,11 @@ pub struct TableSelector { } impl TableSelector { - pub fn client_addr(&self) -> &SocketAddr { - &self.session_id.from_client + pub fn client_id(&self) -> (IpAddr, String) { + ( + self.session_id.from_client, + self.session_id.listener.clone(), + ) } pub fn session_id(&self) -> Option<&SessionId> { match self.route_state { @@ -76,7 +80,7 @@ impl TableSelector { pub enum TableQuery { Table(TableSelector), Session(SessionId), - Client(SocketAddr), + Client(IpAddr, String), Router(RouterId), } @@ -164,20 +168,22 @@ pub trait Store: Clone + Send + Sync + 'static { fn get_tables(&self) -> Vec; - fn get_routers(&self) -> HashMap; + fn get_routers(&self) -> HashMap<(IpAddr, String), Client>; - fn get_routing_instances(&self) -> HashMap>; + fn get_routing_instances(&self) -> HashMap<(IpAddr, String), HashSet>; fn client_up( &self, - client_addr: SocketAddr, + client_ip: IpAddr, + listener: String, route_state: RouteState, client_data: Client, ) -> impl std::future::Future + std::marker::Send; fn client_down( &self, - client_addr: SocketAddr, + client_ip: IpAddr, + listener: String, ) -> impl std::future::Future + std::marker::Send; fn session_up( diff --git a/src/store_impl.rs b/src/store_impl.rs index 32cfc5d..2f7781d 100644 --- a/src/store_impl.rs +++ b/src/store_impl.rs @@ -7,7 +7,7 @@ use rayon::iter::ParallelIterator; use regex::Regex; use std::collections::HashMap; use std::collections::HashSet; -use std::net::SocketAddr; +use std::net::IpAddr; use std::pin::Pin; use std::sync::Arc; use std::sync::Mutex; @@ -20,16 +20,17 @@ use crate::table_impl::*; #[derive(Default, Clone)] pub struct InMemoryStore { - clients: Arc>>, + clients: Arc>>, sessions: Arc>>, tables: Arc>>, caches: Arc>, } fn tables_for_client_fn( - query_from_client: &SocketAddr, + client_ip: IpAddr, + listener: &str, ) -> impl Fn(&(&TableSelector, &InMemoryTable)) -> bool + '_ { - move |(k, _): &(_, _)| k.client_addr() == query_from_client + move |(k, _): &(_, _)| k.client_id() == (client_ip, listener.to_string()) } fn tables_for_session_fn( @@ -48,7 +49,7 @@ impl InMemoryStore { &clients .lock() .unwrap() - .get(k.client_addr()) + .get(&k.client_id()) .unwrap() .router_id == query_router_id @@ -64,13 +65,14 @@ impl InMemoryStore { } fn get_tables_for_client( &self, - client_addr: &SocketAddr, + client_ip: IpAddr, + listener: &str, ) -> Vec<(TableSelector, InMemoryTable)> { self.tables .lock() .unwrap() .iter() - .filter(tables_for_client_fn(client_addr)) + .filter(tables_for_client_fn(client_ip, listener)) .map(|(k, v)| (k.clone(), v.clone())) .collect() } @@ -119,7 +121,9 @@ impl Store for InMemoryStore { fn get_routes(&self, query: Query) -> Pin + Send>> { let mut tables = match query.table_query { Some(TableQuery::Table(table)) => vec![(table.clone(), self.get_table(table))], - Some(TableQuery::Client(client_addr)) => self.get_tables_for_client(&client_addr), + Some(TableQuery::Client(client_addr, listener)) => { + self.get_tables_for_client(client_addr, &listener) + } Some(TableQuery::Router(router_id)) => self.get_tables_for_router(&router_id), Some(TableQuery::Session(session_id)) => self.get_tables_for_session(&session_id), None => self.tables.lock().unwrap().clone().into_iter().collect(), @@ -190,7 +194,7 @@ impl Store for InMemoryStore { let clients = clients.clone(); let sessions = sessions.clone(); async move { - let client = match clients.lock().unwrap().get(table.client_addr()) { + let client = match clients.lock().unwrap().get(&table.client_id()) { Some(v) => v.clone(), None => { warn!("client is not connected"); @@ -219,43 +223,49 @@ impl Store for InMemoryStore { self.tables.lock().unwrap().keys().cloned().collect() } - fn get_routers(&self) -> HashMap { + fn get_routers(&self) -> HashMap<(IpAddr, String), Client> { self.clients.lock().unwrap().clone() } - fn get_routing_instances(&self) -> HashMap> { + fn get_routing_instances(&self) -> HashMap<(IpAddr, String), HashSet> { let tables = self.tables.lock().unwrap().clone(); let mut hm = HashMap::new(); for table_selector in tables.into_keys() { - hm.entry(table_selector.session_id.from_client) - .or_insert(HashSet::new()) - .insert(table_selector.route_distinguisher); + hm.entry(( + table_selector.session_id.from_client, + table_selector.session_id.listener, + )) + .or_insert(HashSet::new()) + .insert(table_selector.route_distinguisher); } - hm } async fn client_up( &self, - client_addr: SocketAddr, + client_ip: IpAddr, + listener: String, _route_state: RouteState, client_data: Client, ) { self.clients .lock() .unwrap() - .insert(client_addr, client_data); + .insert((client_ip, listener), client_data); } - async fn client_down(&self, client_addr: SocketAddr) { - self.clients.lock().unwrap().remove(&client_addr); + async fn client_down(&self, client_ip: IpAddr, listener: String) { + self.clients + .lock() + .unwrap() + .remove(&(client_ip, listener.clone())); self.sessions .lock() .unwrap() - .retain(|k, _| k.from_client != client_addr); + .retain(|k, _| !(k.from_client == client_ip && k.listener == listener)); self.tables .lock() .unwrap() - .retain(|k, v| !(tables_for_client_fn(&client_addr)(&(k, v)))); + .retain(|k, v| !(tables_for_client_fn(client_ip, &listener)(&(k, v)))); self.caches.lock().unwrap().remove_expired(); } From 8bff20c031bca0e407d7b220dac96500a477fcb0 Mon Sep 17 00:00:00 2001 From: Yureka Date: Sun, 19 Jan 2025 17:31:06 +0100 Subject: [PATCH 9/9] Add bmp relay feature --- Cargo.lock | 3 +- Cargo.toml | 2 +- config.example.yml | 2 +- src/bmp_relay.rs | 165 +++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 3 + src/main.rs | 4 ++ src/store.rs | 99 +++++++++++++++++++++++++++ src/store_impl.rs | 2 +- 8 files changed, 275 insertions(+), 5 deletions(-) create mode 100644 src/bmp_relay.rs diff --git a/Cargo.lock b/Cargo.lock index 62a21de..86781b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2128,8 +2128,7 @@ checksum = "1367295b8f788d371ce2dbc842c7b709c73ee1364d30351dd300ec2203b12377" [[package]] name = "zettabgp" version = "0.3.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "459d02087e2d98c5311bf7b7cb932622c649c33881f4243e564cac4a744a0ca5" +source = "git+https://github.com/wladwm/zettabgp#925c0bc9e4eaefca11dc7c5fb93a99574762bf28" dependencies = [ "log", "serde", diff --git a/Cargo.toml b/Cargo.toml index 26d0305..58fc550 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,7 +29,7 @@ tokio-util = { version = "0.7", features = ["codec"] } weak-table = "0.3" nibbletree = { version = "0.2", path = "./nibbletree", features = ["ipnet"] } autometrics = { version = "0.3", features = ["prometheus-exporter"] } -zettabgp = "0.3.4" +zettabgp = { version = "0.3.4", git = "https://github.com/wladwm/zettabgp" } hickory-resolver = "0.24" include_dir = { version = "0.7", optional = true } mime_guess = { version = "2.0", optional = true } diff --git a/config.example.yml b/config.example.yml index 7c79c93..d5ff84e 100644 --- a/config.example.yml +++ b/config.example.yml @@ -6,4 +6,4 @@ collectors: collector_type: Bmp bind: "[::]:11019" peers: - "192.0.2.1": {} + "2a0e:b940:0:2:a00e:f9ff:fe1b:b7e9": {} diff --git a/src/bmp_relay.rs b/src/bmp_relay.rs new file mode 100644 index 0000000..d39fcfc --- /dev/null +++ b/src/bmp_relay.rs @@ -0,0 +1,165 @@ +use crate::compressed_attrs::decompress_route_attrs; +use crate::store::make_bgp_withdraw; +use crate::store::TableSelector; +use crate::store_impl::InMemoryStore; +use crate::table_impl::Action; +use crate::table_impl::InMemoryTable; +use crate::table_stream::table_stream; +use futures_util::pin_mut; +use futures_util::StreamExt; +use serde::Deserialize; +use std::net::IpAddr; +use std::net::Ipv4Addr; +use std::net::SocketAddr; +use std::time::Duration; +use tokio::io::AsyncWriteExt; +use tokio::net::TcpSocket; +use tokio::net::TcpStream; +use zettabgp::bmp::prelude::*; +use zettabgp::prelude::*; + +#[derive(Debug, Deserialize)] +pub struct RelayConfig { + table: TableSelector, + + /// For LocRIB fake BGP open message + router_id: Ipv4Addr, + asn: u32, + + monitoring_station: SocketAddr, + + bind_addr: Option, + bind_port: Option, +} + +async fn connect(cfg: &RelayConfig) -> std::io::Result { + let (sock, default_bind_addr) = match cfg.monitoring_station.ip() { + IpAddr::V4(_) => (TcpSocket::new_v4()?, "0.0.0.0".parse().unwrap()), + IpAddr::V6(_) => (TcpSocket::new_v6()?, "::".parse().unwrap()), + }; + let bind_addr = SocketAddr::new( + cfg.bind_addr.unwrap_or(default_bind_addr), + cfg.bind_port.unwrap_or(0), + ); + sock.bind(bind_addr)?; + sock.connect(cfg.monitoring_station).await +} + +async fn run_connection(cfg: &RelayConfig, table: &InMemoryTable, mut tcp_stream: TcpStream) { + let mut buf = [0; 10000]; + let updates_stream = table_stream(table); + pin_mut!(updates_stream); + + let fake_open_message = BgpOpenMessage { + as_num: cfg.asn, + caps: vec![ + BgpCapability::SafiIPv4u, + BgpCapability::SafiIPv6u, + BgpCapability::SafiVPNv4u, + BgpCapability::SafiVPNv6u, + BgpCapability::CapRR, + BgpCapability::CapASN32(cfg.asn), + ], + hold_time: 0, + router_id: cfg.router_id, + }; + let peer_hdr = BmpMessagePeerHeader { + peertype: 3, + flags: 0, + peerdistinguisher: BgpRD::new(0, 0), + peeraddress: "::".parse().unwrap(), + asnum: cfg.asn, + routerid: cfg.router_id, + timestamp: 0, + }; + let mut bmp_messages = futures_util::stream::iter([ + BmpMessage::Initiation(BmpMessageInitiation { + str0: None, + sys_descr: None, + sys_name: None, + }), + BmpMessage::PeerUpNotification(BmpMessagePeerUp { + peer: peer_hdr.clone(), + localaddress: "::".parse().unwrap(), + localport: 0, + remoteport: 0, + msg1: fake_open_message.clone(), + msg2: fake_open_message, + }), + ]) + .chain(updates_stream.map(|action| { + let update = match action { + (net, num, Action::Withdraw) => { + if num != 0 { + log::warn!("add-paths table is not yet implemented"); + } + make_bgp_withdraw(net) + } + (net, num, Action::Update(attrs)) => { + if num != 0 { + log::warn!("add-paths table is not yet implemented"); + } + decompress_route_attrs(&attrs).to_bgp_update(net) + } + }; + + BmpMessage::RouteMonitoring(BmpMessageRouteMonitoring { + peer: peer_hdr.clone(), + update, + }) + })); + + while let Some(bmp_msg) = bmp_messages.next().await { + log::trace!("sending message {}: {:?}", cfg.monitoring_station, bmp_msg); + let mut len = 0; + match bmp_msg.encode_to(&mut buf[5..]) { + Ok(i) => len += i, + Err(e) => { + log::warn!("error encoding BMP message {:?}: {}", bmp_msg, e); + continue; + } + } + let msg_hdr = BmpMessageHeader { + version: 3, + msglength: len + 5, + }; + len += msg_hdr.encode_to(&mut buf).unwrap(); + + if let Err(e) = tcp_stream.write_all(&buf[..len]).await { + log::warn!( + "resetting connection {:?}, reason: {}", + cfg.monitoring_station, + e + ); + return; + } + } +} + +async fn run_(cfg: RelayConfig, store: InMemoryStore) -> ! { + let table = store.get_table(cfg.table.clone()); + loop { + let tcp_stream = match connect(&cfg).await { + Err(_) => { + log::info!("trying to connect {}", cfg.monitoring_station); + tokio::time::sleep(Duration::from_secs(5)).await; + continue; + } + Ok(v) => v, + }; + log::info!("connected {}", cfg.monitoring_station); + + run_connection(&cfg, &table, tcp_stream).await; + } +} + +pub async fn run( + cfg: RelayConfig, + store: InMemoryStore, + mut shutdown: tokio::sync::watch::Receiver, +) -> anyhow::Result<()> { + tokio::select! { + _ = run_(cfg, store) => unreachable!(), + _ = shutdown.changed() => Ok(()), + } +} diff --git a/src/lib.rs b/src/lib.rs index c829180..59c7592 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ pub mod api; pub mod bgp_collector; mod bgpdumper; pub mod bmp_collector; +pub mod bmp_relay; mod compressed_attrs; pub mod route_distinguisher; pub mod store; @@ -42,4 +43,6 @@ pub struct Config { /// Only check config and exit #[serde(default)] pub config_check: bool, + #[serde(default)] + pub bmp_relays: HashMap, } diff --git a/src/main.rs b/src/main.rs index 3199c11..c82329c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -61,6 +61,10 @@ async fn main() -> anyhow::Result<()> { }), ); + futures.extend(cfg.bmp_relays.into_values().map(|relay| { + tokio::task::spawn(bmp_relay::run(relay, store.clone(), shutdown_rx.clone())) + })); + let mut sigint = signal(SignalKind::interrupt())?; let mut sigterm = signal(SignalKind::terminate())?; let res = tokio::select! { diff --git a/src/store.rs b/src/store.rs index 5b89e29..4660713 100644 --- a/src/store.rs +++ b/src/store.rs @@ -148,6 +148,84 @@ impl Default for QueryLimits { } } +pub fn make_bgp_withdraw(net: IpNet) -> zettabgp::prelude::BgpUpdateMessage { + use zettabgp::prelude::*; + + BgpUpdateMessage { + attrs: vec![BgpAttrItem::MPWithdraws(BgpMPWithdraws { + addrs: net_to_bgp_addrs(net), + })], + ..Default::default() + } +} + +impl RouteAttrs { + pub fn to_bgp_update(&self, net: IpNet) -> zettabgp::prelude::BgpUpdateMessage { + use zettabgp::prelude::*; + + let mut attrs = vec![]; + + if let Some(nexthop) = self.nexthop { + attrs.push(BgpAttrItem::MPUpdates(BgpMPUpdates { + nexthop: std_addr_to_bgp_addr(nexthop), + addrs: net_to_bgp_addrs(net), + })); + } else { + warn!("Can not build MPUpdates without nexthop"); + } + + if let Some(communities) = &self.communities { + attrs.push(BgpAttrItem::CommunityList(BgpCommunityList { + value: communities + .iter() + .map(|(high, low)| BgpCommunity::new(((*high as u32) << 16) + *low as u32)) + .collect(), + })); + } + if let Some(large_communities) = &self.large_communities { + attrs.push(BgpAttrItem::LargeCommunityList(BgpLargeCommunityList { + value: large_communities + .iter() + .cloned() + .map(|(ga, ldp1, ldp2)| BgpLargeCommunity { ga, ldp1, ldp2 }) + .collect(), + })); + } + + if let Some(med) = self.med { + attrs.push(BgpAttrItem::MED(BgpMED { value: med })); + } + if let Some(local_pref) = self.local_pref { + attrs.push(BgpAttrItem::LocalPref(BgpLocalpref { value: local_pref })); + } + + if let Some(origin) = &self.origin { + attrs.push(BgpAttrItem::Origin(BgpOrigin { + value: match origin { + RouteOrigin::Igp => BgpAttrOrigin::Igp, + RouteOrigin::Egp => BgpAttrOrigin::Egp, + RouteOrigin::Incomplete => BgpAttrOrigin::Incomplete, + }, + })); + } + + if let Some(as_path) = &self.as_path { + attrs.push(BgpAttrItem::ASPath(BgpASpath { + value: as_path + .iter() + .cloned() + .map(|value| BgpAS { value }) + .collect(), + })); + } + + BgpUpdateMessage { + attrs, + ..Default::default() + } + } +} + pub trait Store: Clone + Send + Sync + 'static { fn update_route( &self, @@ -384,6 +462,27 @@ fn bgp_addrs_to_nets( } } +fn std_addr_to_bgp_addr(net: IpAddr) -> zettabgp::prelude::BgpAddr { + use zettabgp::prelude::*; + match net { + IpAddr::V4(v4) => BgpAddr::V4(v4), + IpAddr::V6(v6) => BgpAddr::V6(v6), + } +} +fn net_to_bgp_addrs(net: IpNet) -> zettabgp::prelude::BgpAddrs { + use zettabgp::prelude::*; + match net { + IpNet::V4(v4) => BgpAddrs::IPV4U(vec![BgpAddrV4 { + addr: v4.addr(), + prefixlen: v4.prefix_len(), + }]), + IpNet::V6(v6) => BgpAddrs::IPV6U(vec![BgpAddrV6 { + addr: v6.addr(), + prefixlen: v6.prefix_len(), + }]), + } +} + fn bgpv4addr_to_ipnet(addr: &BgpAddrV4) -> Option { Ipv4Net::new(addr.addr, addr.prefixlen) .inspect_err(|_| warn!("invalid BgpAddrs prefixlen")) diff --git a/src/store_impl.rs b/src/store_impl.rs index 2f7781d..67eca0d 100644 --- a/src/store_impl.rs +++ b/src/store_impl.rs @@ -55,7 +55,7 @@ impl InMemoryStore { == query_router_id } } - fn get_table(&self, sel: TableSelector) -> InMemoryTable { + pub fn get_table(&self, sel: TableSelector) -> InMemoryTable { self.tables .lock() .unwrap()