From 57744d216abf49402a76ba5975bfda270c97a248 Mon Sep 17 00:00:00 2001 From: tipogi Date: Mon, 2 Mar 2026 13:12:40 +0100 Subject: [PATCH 1/7] draft --- nexus-watcher/src/service/mod.rs | 115 +++++++++++++----- nexus-watcher/tests/service/mod.rs | 1 + nexus-watcher/tests/service/shutdown_guard.rs | 104 ++++++++++++++++ nexus-watcher/tests/service/utils/mod.rs | 2 + .../tests/service/utils/panicking_runner.rs | 73 +++++++++++ 5 files changed, 266 insertions(+), 29 deletions(-) create mode 100644 nexus-watcher/tests/service/shutdown_guard.rs create mode 100644 nexus-watcher/tests/service/utils/panicking_runner.rs diff --git a/nexus-watcher/src/service/mod.rs b/nexus-watcher/src/service/mod.rs index b07a6d981..f1b51a71b 100644 --- a/nexus-watcher/src/service/mod.rs +++ b/nexus-watcher/src/service/mod.rs @@ -9,6 +9,7 @@ pub use constants::{PROCESSING_TIMEOUT_SECS, WATCHER_CONFIG_FILE_NAME}; use nexus_common::types::DynError; pub use processor::EventProcessor; pub use processor_runner::EventProcessorRunner; +pub use stats::ProcessedStats; pub use traits::{TEventProcessor, TEventProcessorRunner}; use crate::NexusWatcherBuilder; @@ -19,10 +20,22 @@ use nexus_common::{DaemonConfig, WatcherConfig}; use pubky_app_specs::PubkyId; use std::path::PathBuf; use std::sync::Arc; -use tokio::sync::watch::Receiver; +use tokio::sync::watch::{Receiver, Sender}; use tokio::time::Duration; use tracing::{debug, error, info}; +/// Sends a shutdown signal when dropped. Attach to each spawned task so that if any task +/// exits (normally or via panic), the remaining tasks are notified to stop gracefully. +struct ShutdownGuard(Option>); + +impl Drop for ShutdownGuard { + fn drop(&mut self) { + if let Some(tx) = self.0.take() { + let _ = tx.send(true); + } + } +} + pub struct NexusWatcher {} impl NexusWatcher { @@ -68,34 +81,49 @@ impl NexusWatcher { NexusWatcherBuilder(watcher_config).start(shutdown_rx).await } - /// Starts the Nexus Watcher with 3 parallel loops: - /// - /// 1. **Default homeserver loop**: Processes events from the default homeserver defined in [`WatcherConfig`]. - /// 2. **External homeservers loop**: Processes events from all external monitored homeservers, excluding the default. - /// 3. **Reserved loop**: Placeholder for future use. - /// - /// All loops share the same tick interval ([`WatcherConfig::watcher_sleep`]) and listen for - /// the shutdown signal to exit gracefully. + /// Initializes configuration, persists the default homeserver, and delegates to [`Self::run_tasks`]. pub async fn start(shutdown_rx: Receiver, config: WatcherConfig) -> Result<(), DynError> { debug!(?config, "Running NexusWatcher with "); let config_hs = PubkyId::try_from(config.homeserver.as_str())?; Homeserver::persist_if_unknown(config_hs).await?; - let watcher_sleep = config.watcher_sleep; - let ev_processor_runner = EventProcessorRunner::from_config(&config, shutdown_rx.clone()); - let ev_processor_runner = Arc::new(ev_processor_runner); + let runner = EventProcessorRunner::from_config(&config, shutdown_rx.clone()); + Self::run_tasks(shutdown_rx, Arc::new(runner), config.watcher_sleep).await + } + + /// Spawns processing tasks with [`ShutdownGuard`] protection and waits for completion. + /// + /// Three parallel tasks are spawned: + /// 1. **Default homeserver task**: calls [`TEventProcessorRunner::run_default_homeserver`] each tick. + /// 2. **External homeservers task**: calls [`TEventProcessorRunner::run_external_homeservers`] each tick. + /// 3. **Shutdown forwarder task**: bridges the external `shutdown_rx` (e.g. SIGINT) into an internal channel. + /// + /// Each task holds a [`ShutdownGuard`]. If any task exits or panics, the guard drops and + /// signals all remaining tasks to shut down gracefully via the internal channel. + /// + /// Separated from [`Self::start`] to allow injection of mock runners in tests. + pub async fn run_tasks( + shutdown_rx: Receiver, + runner: Arc, + watcher_sleep: u64, + ) -> Result<(), DynError> { + // Internal channel: tasks signal each other via ShutdownGuard on exit/panic. + // The forwarder bridges the external SIGINT into this channel. + let (internal_shutdown_tx, internal_shutdown_rx) = tokio::sync::watch::channel(false); - // Thread 1: Default homeserver processing + // Task 1: Default homeserver processing let default_hs_handle = { - let runner = ev_processor_runner.clone(); - let mut shutdown = shutdown_rx.clone(); + let runner = runner.clone(); + let mut shutdown = internal_shutdown_rx.clone(); + let guard = ShutdownGuard(Some(internal_shutdown_tx.clone())); tokio::spawn(async move { + let _guard = guard; let mut interval = tokio::time::interval(Duration::from_millis(watcher_sleep)); loop { tokio::select! { _ = shutdown.changed() => { - info!("SIGINT received, exiting default homeserver loop"); + info!("Shutdown received, exiting default homeserver loop"); break; } _ = interval.tick() => { @@ -110,41 +138,70 @@ impl NexusWatcher { }) }; - // Thread 2: External homeservers processing + // Task 2: External homeservers processing let external_hss_handle = { - let runner = ev_processor_runner.clone(); - let mut shutdown = shutdown_rx.clone(); + let runner = runner.clone(); + let mut shutdown = internal_shutdown_rx.clone(); + let guard = ShutdownGuard(Some(internal_shutdown_tx.clone())); tokio::spawn(async move { + let _guard = guard; let mut interval = tokio::time::interval(Duration::from_millis(watcher_sleep)); loop { tokio::select! { _ = shutdown.changed() => { - info!("SIGINT received, exiting other homeservers loop"); + info!("Shutdown received, exiting external homeservers loop"); break; } _ = interval.tick() => { - debug!("Indexing other homeservers…"); + debug!("Indexing external homeservers…"); _ = runner .run_external_homeservers() .await - .inspect_err(|e| error!("Failed to start event processors run: {e}")); + .inspect_err(|e| error!("Failed to run external homeservers event processor: {e}")); } } } }) }; - // Thread 3: Reserved for future use - let reserved_handle = { - let mut shutdown = shutdown_rx.clone(); + // Forwarder: bridges external SIGINT into the internal shutdown channel. + // Also listens to the internal channel so it exits when a sibling task crashes. + let forwarder_handle = { + let mut external = shutdown_rx; + let mut internal = internal_shutdown_rx.clone(); + let guard = ShutdownGuard(Some(internal_shutdown_tx.clone())); tokio::spawn(async move { - // TODO: Reserved for future use - let _ = shutdown.changed().await; - info!("SIGINT received, exiting reserved loop"); + let _guard = guard; + tokio::select! { + _ = external.changed() => { + info!("SIGINT received, forwarding shutdown to watcher tasks"); + } + _ = internal.changed() => { + info!("Internal shutdown received in forwarder, exiting"); + } + } }) }; - let _ = tokio::try_join!(default_hs_handle, external_hss_handle, reserved_handle); + // Drop the original sender so only task guards and the forwarder hold senders + drop(internal_shutdown_tx); + + let (r1, r2, r3) = tokio::join!(default_hs_handle, external_hss_handle, forwarder_handle); + + if let Err(ref e) = r1 { + error!("Default homeserver task failed: {e}"); + } + if let Err(ref e) = r2 { + error!("External homeservers task failed: {e}"); + } + if let Err(ref e) = r3 { + error!("Shutdown forwarder task failed: {e}"); + } + + if r1.is_err() || r2.is_err() || r3.is_err() { + return Err("Nexus Watcher stopped: one or more tasks failed".into()); + } + info!("Nexus Watcher shut down gracefully"); Ok(()) } diff --git a/nexus-watcher/tests/service/mod.rs b/nexus-watcher/tests/service/mod.rs index 0b77f7bc8..528432cb8 100644 --- a/nexus-watcher/tests/service/mod.rs +++ b/nexus-watcher/tests/service/mod.rs @@ -1,5 +1,6 @@ pub mod event_processing_multiple_homeservers; pub mod event_processor_prioritization; pub mod mock_event_processor; +pub mod shutdown_guard; pub mod signal; pub mod utils; diff --git a/nexus-watcher/tests/service/shutdown_guard.rs b/nexus-watcher/tests/service/shutdown_guard.rs new file mode 100644 index 000000000..f114d3e1f --- /dev/null +++ b/nexus-watcher/tests/service/shutdown_guard.rs @@ -0,0 +1,104 @@ +use crate::service::utils::{ + create_random_homeservers_and_persist, setup, MockEventProcessorResult, + MockEventProcessorRunner, PanickingDefaultHsRunner, PanickingExternalHsRunner, +}; +use anyhow::Result; +use nexus_watcher::service::NexusWatcher; +use std::sync::Arc; +use std::time::Duration; +use tokio::time::sleep; + +/// Test A: Sending an external shutdown signal causes `run_tasks` to return `Ok`. +/// Verifies the forwarder task bridges the external signal into the internal channel, +/// causing all processing tasks to exit gracefully. +#[tokio_shared_rt::test(shared)] +async fn test_run_tasks_clean_shutdown_via_external_signal() -> Result<()> { + let mut event_processor_list = setup().await?; + let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); + + for _ in 0..3 { + create_random_homeservers_and_persist( + &mut event_processor_list, + None, + MockEventProcessorResult::Success, + None, + shutdown_rx.clone(), + ) + .await; + } + + let runner = MockEventProcessorRunner::new(event_processor_list, 3, shutdown_rx.clone()); + let runner = Arc::new(runner); + + tokio::spawn(async move { + sleep(Duration::from_millis(500)).await; + let _ = shutdown_tx.send(true); + }); + + let result = NexusWatcher::run_tasks(shutdown_rx, runner, 100).await; + assert!(result.is_ok(), "run_tasks should return Ok on clean shutdown"); + + Ok(()) +} + +/// Test B: A panic in `run_default_homeserver` causes the spawned task to crash. +/// The ShutdownGuard drops, signaling the external HS task and forwarder to stop. +/// `run_tasks` returns `Err` because `tokio::join!` collects the panicked JoinHandle. +#[tokio_shared_rt::test(shared)] +async fn test_run_tasks_default_hs_panic_propagates_via_guard() -> Result<()> { + let mut event_processor_list = setup().await?; + let (_shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); + + for _ in 0..3 { + create_random_homeservers_and_persist( + &mut event_processor_list, + None, + MockEventProcessorResult::Success, + None, + shutdown_rx.clone(), + ) + .await; + } + + let inner = MockEventProcessorRunner::new(event_processor_list, 3, shutdown_rx.clone()); + let runner = Arc::new(PanickingDefaultHsRunner { inner }); + + let result = NexusWatcher::run_tasks(shutdown_rx, runner, 100).await; + assert!( + result.is_err(), + "run_tasks should return Err when the default HS task panics" + ); + + Ok(()) +} + +/// Test C: A panic in `run_external_homeservers` causes the spawned task to crash. +/// The ShutdownGuard drops, signaling the default HS task and forwarder to stop. +/// `run_tasks` returns `Err` because `tokio::join!` collects the panicked JoinHandle. +#[tokio_shared_rt::test(shared)] +async fn test_run_tasks_external_hs_panic_propagates_via_guard() -> Result<()> { + let mut event_processor_list = setup().await?; + let (_shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); + + for _ in 0..3 { + create_random_homeservers_and_persist( + &mut event_processor_list, + None, + MockEventProcessorResult::Success, + None, + shutdown_rx.clone(), + ) + .await; + } + + let inner = MockEventProcessorRunner::new(event_processor_list, 3, shutdown_rx.clone()); + let runner = Arc::new(PanickingExternalHsRunner { inner }); + + let result = NexusWatcher::run_tasks(shutdown_rx, runner, 100).await; + assert!( + result.is_err(), + "run_tasks should return Err when the external HS task panics" + ); + + Ok(()) +} diff --git a/nexus-watcher/tests/service/utils/mod.rs b/nexus-watcher/tests/service/utils/mod.rs index d7d140e28..59742b5f7 100644 --- a/nexus-watcher/tests/service/utils/mod.rs +++ b/nexus-watcher/tests/service/utils/mod.rs @@ -1,8 +1,10 @@ +mod panicking_runner; mod processor; mod processor_runner; mod result; mod setup; +pub use panicking_runner::{PanickingDefaultHsRunner, PanickingExternalHsRunner}; pub use processor::{ create_mock_event_processors, create_random_homeservers_and_persist, MockEventProcessor, }; diff --git a/nexus-watcher/tests/service/utils/panicking_runner.rs b/nexus-watcher/tests/service/utils/panicking_runner.rs new file mode 100644 index 000000000..803512afe --- /dev/null +++ b/nexus-watcher/tests/service/utils/panicking_runner.rs @@ -0,0 +1,73 @@ +use crate::service::utils::MockEventProcessorRunner; +use nexus_common::types::DynError; +use nexus_watcher::service::{ProcessedStats, TEventProcessor, TEventProcessorRunner}; +use std::sync::Arc; +use tokio::sync::watch::Receiver; + +/// Wraps a [`MockEventProcessorRunner`] but panics in `run_default_homeserver()`. +/// Used to test that a panic in the default HS task propagates via ShutdownGuard +/// and causes the external HS task to stop. +pub struct PanickingDefaultHsRunner { + pub inner: MockEventProcessorRunner, +} + +#[async_trait::async_trait] +impl TEventProcessorRunner for PanickingDefaultHsRunner { + fn shutdown_rx(&self) -> Receiver { + self.inner.shutdown_rx() + } + + fn default_homeserver(&self) -> &str { + self.inner.default_homeserver() + } + + fn monitored_homeservers_limit(&self) -> usize { + self.inner.monitored_homeservers_limit() + } + + async fn external_homeservers_by_priority(&self) -> Result, DynError> { + self.inner.external_homeservers_by_priority().await + } + + async fn build(&self, homeserver_id: String) -> Result, DynError> { + self.inner.build(homeserver_id).await + } + + async fn run_default_homeserver(&self) -> Result { + panic!("simulated default HS task crash"); + } +} + +/// Wraps a [`MockEventProcessorRunner`] but panics in `run_external_homeservers()`. +/// Used to test that a panic in the external HS task propagates via ShutdownGuard +/// and causes the default HS task to stop. +pub struct PanickingExternalHsRunner { + pub inner: MockEventProcessorRunner, +} + +#[async_trait::async_trait] +impl TEventProcessorRunner for PanickingExternalHsRunner { + fn shutdown_rx(&self) -> Receiver { + self.inner.shutdown_rx() + } + + fn default_homeserver(&self) -> &str { + self.inner.default_homeserver() + } + + fn monitored_homeservers_limit(&self) -> usize { + self.inner.monitored_homeservers_limit() + } + + async fn external_homeservers_by_priority(&self) -> Result, DynError> { + self.inner.external_homeservers_by_priority().await + } + + async fn build(&self, homeserver_id: String) -> Result, DynError> { + self.inner.build(homeserver_id).await + } + + async fn run_external_homeservers(&self) -> Result { + panic!("simulated external HS task crash"); + } +} From 45073f717af8e4b27b1a1fd180bc04e8af842d64 Mon Sep 17 00:00:00 2001 From: tipogi Date: Tue, 3 Mar 2026 10:18:37 +0100 Subject: [PATCH 2/7] simplify impl. with JoinSet --- nexus-watcher/src/service/mod.rs | 85 +++++++++++++------------------- 1 file changed, 35 insertions(+), 50 deletions(-) diff --git a/nexus-watcher/src/service/mod.rs b/nexus-watcher/src/service/mod.rs index f1b51a71b..6ca909e28 100644 --- a/nexus-watcher/src/service/mod.rs +++ b/nexus-watcher/src/service/mod.rs @@ -20,22 +20,10 @@ use nexus_common::{DaemonConfig, WatcherConfig}; use pubky_app_specs::PubkyId; use std::path::PathBuf; use std::sync::Arc; -use tokio::sync::watch::{Receiver, Sender}; +use tokio::sync::watch::Receiver; use tokio::time::Duration; use tracing::{debug, error, info}; -/// Sends a shutdown signal when dropped. Attach to each spawned task so that if any task -/// exits (normally or via panic), the remaining tasks are notified to stop gracefully. -struct ShutdownGuard(Option>); - -impl Drop for ShutdownGuard { - fn drop(&mut self) { - if let Some(tx) = self.0.take() { - let _ = tx.send(true); - } - } -} - pub struct NexusWatcher {} impl NexusWatcher { @@ -92,15 +80,15 @@ impl NexusWatcher { Self::run_tasks(shutdown_rx, Arc::new(runner), config.watcher_sleep).await } - /// Spawns processing tasks with [`ShutdownGuard`] protection and waits for completion. + /// Spawns processing tasks and waits for completion using a [`tokio::task::JoinSet`]. /// /// Three parallel tasks are spawned: /// 1. **Default homeserver task**: calls [`TEventProcessorRunner::run_default_homeserver`] each tick. /// 2. **External homeservers task**: calls [`TEventProcessorRunner::run_external_homeservers`] each tick. /// 3. **Shutdown forwarder task**: bridges the external `shutdown_rx` (e.g. SIGINT) into an internal channel. /// - /// Each task holds a [`ShutdownGuard`]. If any task exits or panics, the guard drops and - /// signals all remaining tasks to shut down gracefully via the internal channel. + /// When any task exits (normally or via panic), `JoinSet::join_next` returns and the + /// remaining tasks are signalled to shut down gracefully via the internal channel. /// /// Separated from [`Self::start`] to allow injection of mock runners in tests. pub async fn run_tasks( @@ -108,17 +96,14 @@ impl NexusWatcher { runner: Arc, watcher_sleep: u64, ) -> Result<(), DynError> { - // Internal channel: tasks signal each other via ShutdownGuard on exit/panic. - // The forwarder bridges the external SIGINT into this channel. let (internal_shutdown_tx, internal_shutdown_rx) = tokio::sync::watch::channel(false); + let mut set = tokio::task::JoinSet::new(); // Task 1: Default homeserver processing - let default_hs_handle = { + { let runner = runner.clone(); let mut shutdown = internal_shutdown_rx.clone(); - let guard = ShutdownGuard(Some(internal_shutdown_tx.clone())); - tokio::spawn(async move { - let _guard = guard; + set.spawn(async move { let mut interval = tokio::time::interval(Duration::from_millis(watcher_sleep)); loop { tokio::select! { @@ -135,16 +120,14 @@ impl NexusWatcher { } } } - }) - }; + }); + } // Task 2: External homeservers processing - let external_hss_handle = { + { let runner = runner.clone(); let mut shutdown = internal_shutdown_rx.clone(); - let guard = ShutdownGuard(Some(internal_shutdown_tx.clone())); - tokio::spawn(async move { - let _guard = guard; + set.spawn(async move { let mut interval = tokio::time::interval(Duration::from_millis(watcher_sleep)); loop { tokio::select! { @@ -161,17 +144,14 @@ impl NexusWatcher { } } } - }) - }; + }); + } - // Forwarder: bridges external SIGINT into the internal shutdown channel. - // Also listens to the internal channel so it exits when a sibling task crashes. - let forwarder_handle = { + // Task 3: Forwarder — bridges external SIGINT into the internal shutdown channel + { let mut external = shutdown_rx; let mut internal = internal_shutdown_rx.clone(); - let guard = ShutdownGuard(Some(internal_shutdown_tx.clone())); - tokio::spawn(async move { - let _guard = guard; + set.spawn(async move { tokio::select! { _ = external.changed() => { info!("SIGINT received, forwarding shutdown to watcher tasks"); @@ -180,25 +160,30 @@ impl NexusWatcher { info!("Internal shutdown received in forwarder, exiting"); } } - }) - }; - - // Drop the original sender so only task guards and the forwarder hold senders - drop(internal_shutdown_tx); + }); + } - let (r1, r2, r3) = tokio::join!(default_hs_handle, external_hss_handle, forwarder_handle); + // Block until the first task exits for any reason + let first = set.join_next().await; + let mut had_error = false; - if let Err(ref e) = r1 { - error!("Default homeserver task failed: {e}"); + if let Some(Err(ref e)) = first { + error!("Task failed: {e}"); + had_error = true; } - if let Err(ref e) = r2 { - error!("External homeservers task failed: {e}"); - } - if let Err(ref e) = r3 { - error!("Shutdown forwarder task failed: {e}"); + + // Signal the remaining tasks to stop gracefully + let _ = internal_shutdown_tx.send(true); + + // Drain remaining tasks + while let Some(result) = set.join_next().await { + if let Err(ref e) = result { + error!("Task failed: {e}"); + had_error = true; + } } - if r1.is_err() || r2.is_err() || r3.is_err() { + if had_error { return Err("Nexus Watcher stopped: one or more tasks failed".into()); } From 09cb41b0fdd5f0f2273e98d435cd308e095d910f Mon Sep 17 00:00:00 2001 From: tipogi Date: Wed, 4 Mar 2026 07:29:46 +0100 Subject: [PATCH 3/7] fix(watcher): prevent burst catch-up after slow rounds --- nexus-watcher/src/service/mod.rs | 2 + .../tests/service/missed_tick_skip.rs | 140 ++++++++++++++++++ nexus-watcher/tests/service/mod.rs | 1 + 3 files changed, 143 insertions(+) create mode 100644 nexus-watcher/tests/service/missed_tick_skip.rs diff --git a/nexus-watcher/src/service/mod.rs b/nexus-watcher/src/service/mod.rs index 6ca909e28..64ae9f147 100644 --- a/nexus-watcher/src/service/mod.rs +++ b/nexus-watcher/src/service/mod.rs @@ -105,6 +105,7 @@ impl NexusWatcher { let mut shutdown = internal_shutdown_rx.clone(); set.spawn(async move { let mut interval = tokio::time::interval(Duration::from_millis(watcher_sleep)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { tokio::select! { _ = shutdown.changed() => { @@ -129,6 +130,7 @@ impl NexusWatcher { let mut shutdown = internal_shutdown_rx.clone(); set.spawn(async move { let mut interval = tokio::time::interval(Duration::from_millis(watcher_sleep)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { tokio::select! { _ = shutdown.changed() => { diff --git a/nexus-watcher/tests/service/missed_tick_skip.rs b/nexus-watcher/tests/service/missed_tick_skip.rs new file mode 100644 index 000000000..01f9c68b7 --- /dev/null +++ b/nexus-watcher/tests/service/missed_tick_skip.rs @@ -0,0 +1,140 @@ +use anyhow::Result; +use nexus_common::models::event::EventProcessorError; +use nexus_common::types::DynError; +use nexus_watcher::service::{NexusWatcher, TEventProcessor, TEventProcessorRunner}; +use pubky_app_specs::PubkyId; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::sync::watch::Receiver; + +const TEST_HS_ID: &str = "1hb71xx9km3f4pw5izsy1gn19ff1uuuqonw4mcygzobwkryujoiy"; + +/// Processor that sleeps on the first N invocations and is instant afterward. +/// This pattern is essential: `MissedTickBehavior::Skip` only produces +/// observable waiting when processing time drops *below* the interval after +/// a slow round. When every round exceeds the interval, Skip degenerates to +/// immediate-fire behavior identical to Burst. +struct VariableDelayProcessor { + call_count: AtomicUsize, + slow_first_n: usize, + slow_delay: Duration, + hs_id: PubkyId, +} + +#[async_trait::async_trait] +impl TEventProcessor for VariableDelayProcessor { + fn get_homeserver_id(&self) -> PubkyId { + self.hs_id.clone() + } + + async fn run_internal(self: Arc) -> Result<(), EventProcessorError> { + let n = self.call_count.fetch_add(1, Ordering::SeqCst); + if n < self.slow_first_n { + tokio::time::sleep(self.slow_delay).await; + } + Ok(()) + } +} + +/// Minimal runner that returns the shared `VariableDelayProcessor` on each +/// `build()` call. External homeservers are empty so only the default-HS +/// loop triggers builds. +struct VariableDelayRunner { + processor: Arc, + build_timestamps: std::sync::Mutex>, + shutdown_rx: Receiver, +} + +#[async_trait::async_trait] +impl TEventProcessorRunner for VariableDelayRunner { + fn shutdown_rx(&self) -> Receiver { + self.shutdown_rx.clone() + } + + fn default_homeserver(&self) -> &str { + TEST_HS_ID + } + + fn monitored_homeservers_limit(&self) -> usize { + 0 + } + + async fn external_homeservers_by_priority(&self) -> Result, DynError> { + Ok(vec![]) + } + + /// Records the instant of each `build()` call, then returns the shared processor. + /// + /// `NexusWatcher::run_tasks` calls `build()` on each interval tick for the + /// default homeserver loop. By collecting these timestamps we can assert that + /// calls stay spaced out (Skip) instead of clustering in burst catch-up mode. + async fn build(&self, _homeserver_id: String) -> Result, DynError> { + self.build_timestamps + .lock() + .unwrap() + .push(Instant::now()); + Ok(self.processor.clone()) + } +} + +/// Verifies that `run_tasks` uses `MissedTickBehavior::Skip`. +/// +/// The first processing round sleeps 500 ms (exceeding the 100 ms interval), +/// causing several ticks to be missed. Subsequent rounds are instant. +/// +/// With **Skip**, missed ticks are dropped and the loop resumes with regular +/// spacing, i.e. no immediate back-to-back `build()` calls. +/// +/// With **Burst** (tokio default), missed ticks fire immediately after the +/// slow round and create near-zero inter-build gaps. +#[tokio_shared_rt::test(shared)] +async fn test_no_burst_after_slow_processing_round() -> Result<()> { + let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); + + let processor = Arc::new(VariableDelayProcessor { + call_count: AtomicUsize::new(0), + slow_first_n: 1, + slow_delay: Duration::from_millis(500), + hs_id: PubkyId::try_from(TEST_HS_ID).unwrap(), + }); + + let runner = Arc::new(VariableDelayRunner { + processor, + build_timestamps: std::sync::Mutex::new(Vec::new()), + shutdown_rx: shutdown_rx.clone(), + }); + + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(1500)).await; + let _ = shutdown_tx.send(true); + }); + + let _ = NexusWatcher::run_tasks(shutdown_rx, runner.clone(), 100).await; + + let timestamps = runner.build_timestamps.lock().unwrap().clone(); + + assert!( + timestamps.len() >= 3, + "Expected at least 3 build() calls, got {}", + timestamps.len() + ); + + // We do not require perfect 100 ms cadence because test hosts can be noisy, + // but any "burst catch-up" would produce near-zero gaps. Keep a conservative + // floor that catches burst behavior without being CI-fragile. + let min_expected_non_burst_gap = Duration::from_millis(30); + + for i in 1..timestamps.len() { + let gap = timestamps[i].duration_since(timestamps[i - 1]); + assert!( + gap >= min_expected_non_burst_gap, + "Gap between build() call {} and {} was {gap:?}, expected >= {min_expected_non_burst_gap:?}. \ + This suggests burst catch-up ticks fired instead of being skipped.", + i - 1, + i, + ); + } + + Ok(()) +} diff --git a/nexus-watcher/tests/service/mod.rs b/nexus-watcher/tests/service/mod.rs index 528432cb8..cdf3d9f41 100644 --- a/nexus-watcher/tests/service/mod.rs +++ b/nexus-watcher/tests/service/mod.rs @@ -1,5 +1,6 @@ pub mod event_processing_multiple_homeservers; pub mod event_processor_prioritization; +pub mod missed_tick_skip; pub mod mock_event_processor; pub mod shutdown_guard; pub mod signal; From 5b04ac807b88ea8ff058439265a39fb4bc417f55 Mon Sep 17 00:00:00 2001 From: tipogi Date: Wed, 4 Mar 2026 09:55:52 +0100 Subject: [PATCH 4/7] ref: deduplicate processing loops and add task labels --- nexus-watcher/src/service/mod.rs | 143 ++++++++++-------- nexus-watcher/tests/service/shutdown_guard.rs | 10 +- .../tests/service/utils/panicking_runner.rs | 8 +- 3 files changed, 86 insertions(+), 75 deletions(-) diff --git a/nexus-watcher/src/service/mod.rs b/nexus-watcher/src/service/mod.rs index 64ae9f147..1a799f0f7 100644 --- a/nexus-watcher/src/service/mod.rs +++ b/nexus-watcher/src/service/mod.rs @@ -4,7 +4,6 @@ mod processor_runner; mod stats; mod traits; -/// Module exports pub use constants::{PROCESSING_TIMEOUT_SECS, WATCHER_CONFIG_FILE_NAME}; use nexus_common::types::DynError; pub use processor::EventProcessor; @@ -18,6 +17,7 @@ use nexus_common::models::homeserver::Homeserver; use nexus_common::utils::create_shutdown_rx; use nexus_common::{DaemonConfig, WatcherConfig}; use pubky_app_specs::PubkyId; +use std::future::Future; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::watch::Receiver; @@ -97,81 +97,54 @@ impl NexusWatcher { watcher_sleep: u64, ) -> Result<(), DynError> { let (internal_shutdown_tx, internal_shutdown_rx) = tokio::sync::watch::channel(false); - let mut set = tokio::task::JoinSet::new(); + let mut set: tokio::task::JoinSet<&'static str> = tokio::task::JoinSet::new(); // Task 1: Default homeserver processing - { - let runner = runner.clone(); - let mut shutdown = internal_shutdown_rx.clone(); - set.spawn(async move { - let mut interval = tokio::time::interval(Duration::from_millis(watcher_sleep)); - interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - loop { - tokio::select! { - _ = shutdown.changed() => { - info!("Shutdown received, exiting default homeserver loop"); - break; - } - _ = interval.tick() => { - debug!("Indexing default homeserver…"); - _ = runner - .run_default_homeserver() - .await - .inspect_err(|e| error!("Failed to run default homeserver event processor: {e}")); - } - } - } - }); - } + Self::spawn_processing_loop( + &mut set, + runner.clone(), + internal_shutdown_rx.clone(), + watcher_sleep, + "default homeserver", + |runner| async move { runner.run_default_homeserver().await }, + ); // Task 2: External homeservers processing - { - let runner = runner.clone(); - let mut shutdown = internal_shutdown_rx.clone(); - set.spawn(async move { - let mut interval = tokio::time::interval(Duration::from_millis(watcher_sleep)); - interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - loop { - tokio::select! { - _ = shutdown.changed() => { - info!("Shutdown received, exiting external homeservers loop"); - break; - } - _ = interval.tick() => { - debug!("Indexing external homeservers…"); - _ = runner - .run_external_homeservers() - .await - .inspect_err(|e| error!("Failed to run external homeservers event processor: {e}")); - } - } - } - }); - } + Self::spawn_processing_loop( + &mut set, + runner.clone(), + internal_shutdown_rx.clone(), + watcher_sleep, + "external homeservers", + |runner| async move { runner.run_external_homeservers().await }, + ); // Task 3: Forwarder — bridges external SIGINT into the internal shutdown channel - { - let mut external = shutdown_rx; - let mut internal = internal_shutdown_rx.clone(); - set.spawn(async move { - tokio::select! { - _ = external.changed() => { - info!("SIGINT received, forwarding shutdown to watcher tasks"); - } - _ = internal.changed() => { - info!("Internal shutdown received in forwarder, exiting"); - } + set.spawn(async move { + let mut shutdown_rx = shutdown_rx; + let mut internal_shutdown_rx = internal_shutdown_rx; + tokio::select! { + _ = shutdown_rx.changed() => { + info!("SIGINT received, forwarding shutdown to watcher tasks"); } - }); - } + _ = internal_shutdown_rx.changed() => { + info!("Internal shutdown received in forwarder, exiting"); + } + } + "shutdown forwarder" + }); // Block until the first task exits for any reason let first = set.join_next().await; let mut had_error = false; - if let Some(Err(ref e)) = first { - error!("Task failed: {e}"); - had_error = true; + match &first { + Some(Ok(label)) => info!("First task to exit: {label}"), + Some(Err(e)) => { + error!("Task failed (panic/cancel): {e}"); + had_error = true; + } + None => unreachable!("JoinSet is non-empty"), } // Signal the remaining tasks to stop gracefully @@ -179,9 +152,12 @@ impl NexusWatcher { // Drain remaining tasks while let Some(result) = set.join_next().await { - if let Err(ref e) = result { - error!("Task failed: {e}"); - had_error = true; + match &result { + Ok(label) => info!("Task exited: {label}"), + Err(e) => { + error!("Task failed (panic/cancel): {e}"); + had_error = true; + } } } @@ -192,4 +168,37 @@ impl NexusWatcher { info!("Nexus Watcher shut down gracefully"); Ok(()) } + + fn spawn_processing_loop( + set: &mut tokio::task::JoinSet<&'static str>, + runner: Arc, + mut shutdown: Receiver, + watcher_sleep: u64, + label: &'static str, + process: F, + ) where + F: Fn(Arc) -> Fut + Send + 'static, + Fut: Future> + Send, + { + set.spawn(async move { + let mut interval = tokio::time::interval(Duration::from_millis(watcher_sleep)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + tokio::select! { + _ = shutdown.changed() => { + info!("Shutdown received, exiting {label} loop"); + break; + } + _ = interval.tick() => { + debug!("Indexing {label}…"); + _ = process(runner.clone()) + .await + .inspect_err(|e| error!("Failed to run {label} event processor: {e}")); + } + } + } + label + }); + } } diff --git a/nexus-watcher/tests/service/shutdown_guard.rs b/nexus-watcher/tests/service/shutdown_guard.rs index f114d3e1f..d7b219512 100644 --- a/nexus-watcher/tests/service/shutdown_guard.rs +++ b/nexus-watcher/tests/service/shutdown_guard.rs @@ -42,8 +42,9 @@ async fn test_run_tasks_clean_shutdown_via_external_signal() -> Result<()> { } /// Test B: A panic in `run_default_homeserver` causes the spawned task to crash. -/// The ShutdownGuard drops, signaling the external HS task and forwarder to stop. -/// `run_tasks` returns `Err` because `tokio::join!` collects the panicked JoinHandle. +/// `JoinSet::join_next` surfaces the panic, then the remaining tasks are signalled +/// to stop via the internal shutdown channel. +/// `run_tasks` returns `Err` because the panicked `JoinError` is observed during drain. #[tokio_shared_rt::test(shared)] async fn test_run_tasks_default_hs_panic_propagates_via_guard() -> Result<()> { let mut event_processor_list = setup().await?; @@ -73,8 +74,9 @@ async fn test_run_tasks_default_hs_panic_propagates_via_guard() -> Result<()> { } /// Test C: A panic in `run_external_homeservers` causes the spawned task to crash. -/// The ShutdownGuard drops, signaling the default HS task and forwarder to stop. -/// `run_tasks` returns `Err` because `tokio::join!` collects the panicked JoinHandle. +/// `JoinSet::join_next` surfaces the panic, then the remaining tasks are signalled +/// to stop via the internal shutdown channel. +/// `run_tasks` returns `Err` because the panicked `JoinError` is observed during drain. #[tokio_shared_rt::test(shared)] async fn test_run_tasks_external_hs_panic_propagates_via_guard() -> Result<()> { let mut event_processor_list = setup().await?; diff --git a/nexus-watcher/tests/service/utils/panicking_runner.rs b/nexus-watcher/tests/service/utils/panicking_runner.rs index 803512afe..e97385ce2 100644 --- a/nexus-watcher/tests/service/utils/panicking_runner.rs +++ b/nexus-watcher/tests/service/utils/panicking_runner.rs @@ -5,8 +5,8 @@ use std::sync::Arc; use tokio::sync::watch::Receiver; /// Wraps a [`MockEventProcessorRunner`] but panics in `run_default_homeserver()`. -/// Used to test that a panic in the default HS task propagates via ShutdownGuard -/// and causes the external HS task to stop. +/// Used to test that a panic in the default HS task is detected by the `JoinSet` +/// and causes the external HS task to stop via the internal shutdown channel. pub struct PanickingDefaultHsRunner { pub inner: MockEventProcessorRunner, } @@ -39,8 +39,8 @@ impl TEventProcessorRunner for PanickingDefaultHsRunner { } /// Wraps a [`MockEventProcessorRunner`] but panics in `run_external_homeservers()`. -/// Used to test that a panic in the external HS task propagates via ShutdownGuard -/// and causes the default HS task to stop. +/// Used to test that a panic in the external HS task is detected by the `JoinSet` +/// and causes the default HS task to stop via the internal shutdown channel. pub struct PanickingExternalHsRunner { pub inner: MockEventProcessorRunner, } From 092aeaca0fe30d449b6e9583f3b2a07225bad0e6 Mon Sep 17 00:00:00 2001 From: tipogi Date: Wed, 4 Mar 2026 11:14:43 +0100 Subject: [PATCH 5/7] chore: lint errors --- nexus-common/src/db/graph/queries/put.rs | 2 +- nexus-watcher/tests/service/missed_tick_skip.rs | 7 ++----- nexus-watcher/tests/service/shutdown_guard.rs | 5 ++++- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/nexus-common/src/db/graph/queries/put.rs b/nexus-common/src/db/graph/queries/put.rs index a1f8bdf72..23b2e81be 100644 --- a/nexus-common/src/db/graph/queries/put.rs +++ b/nexus-common/src/db/graph/queries/put.rs @@ -81,7 +81,7 @@ pub fn create_post( .param("content", post.content.to_string()) .param("indexed_at", post.indexed_at) .param("kind", kind.trim_matches('"')) - .param("attachments", post.attachments.clone().unwrap_or(vec![])); + .param("attachments", post.attachments.clone().unwrap_or_default()); // Handle "replied" relationship cypher_query = add_relationship_params( diff --git a/nexus-watcher/tests/service/missed_tick_skip.rs b/nexus-watcher/tests/service/missed_tick_skip.rs index 01f9c68b7..38790e41e 100644 --- a/nexus-watcher/tests/service/missed_tick_skip.rs +++ b/nexus-watcher/tests/service/missed_tick_skip.rs @@ -70,10 +70,7 @@ impl TEventProcessorRunner for VariableDelayRunner { /// default homeserver loop. By collecting these timestamps we can assert that /// calls stay spaced out (Skip) instead of clustering in burst catch-up mode. async fn build(&self, _homeserver_id: String) -> Result, DynError> { - self.build_timestamps - .lock() - .unwrap() - .push(Instant::now()); + self.build_timestamps.lock().unwrap().push(Instant::now()); Ok(self.processor.clone()) } } @@ -124,7 +121,7 @@ async fn test_no_burst_after_slow_processing_round() -> Result<()> { // but any "burst catch-up" would produce near-zero gaps. Keep a conservative // floor that catches burst behavior without being CI-fragile. let min_expected_non_burst_gap = Duration::from_millis(30); - + for i in 1..timestamps.len() { let gap = timestamps[i].duration_since(timestamps[i - 1]); assert!( diff --git a/nexus-watcher/tests/service/shutdown_guard.rs b/nexus-watcher/tests/service/shutdown_guard.rs index d7b219512..2eb6b1b4f 100644 --- a/nexus-watcher/tests/service/shutdown_guard.rs +++ b/nexus-watcher/tests/service/shutdown_guard.rs @@ -36,7 +36,10 @@ async fn test_run_tasks_clean_shutdown_via_external_signal() -> Result<()> { }); let result = NexusWatcher::run_tasks(shutdown_rx, runner, 100).await; - assert!(result.is_ok(), "run_tasks should return Ok on clean shutdown"); + assert!( + result.is_ok(), + "run_tasks should return Ok on clean shutdown" + ); Ok(()) } From 503711edaba8c74408abd16680fab418ede4131b Mon Sep 17 00:00:00 2001 From: tipogi Date: Wed, 4 Mar 2026 17:03:19 +0100 Subject: [PATCH 6/7] ref: inline join_next().await into match in run_tasks --- nexus-watcher/src/service/mod.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/nexus-watcher/src/service/mod.rs b/nexus-watcher/src/service/mod.rs index 1a799f0f7..d5d00e263 100644 --- a/nexus-watcher/src/service/mod.rs +++ b/nexus-watcher/src/service/mod.rs @@ -135,10 +135,9 @@ impl NexusWatcher { }); // Block until the first task exits for any reason - let first = set.join_next().await; let mut had_error = false; - match &first { + match set.join_next().await { Some(Ok(label)) => info!("First task to exit: {label}"), Some(Err(e)) => { error!("Task failed (panic/cancel): {e}"); From ad1a2e6b2271939cd67c316d5f63b0796503b6af Mon Sep 17 00:00:00 2001 From: tipogi Date: Fri, 6 Mar 2026 06:33:14 +0100 Subject: [PATCH 7/7] ref: simplify shutdown guard panic tests --- nexus-watcher/tests/service/shutdown_guard.rs | 150 +++++++++++------- nexus-watcher/tests/service/utils/mod.rs | 2 - .../tests/service/utils/panicking_runner.rs | 73 --------- 3 files changed, 92 insertions(+), 133 deletions(-) delete mode 100644 nexus-watcher/tests/service/utils/panicking_runner.rs diff --git a/nexus-watcher/tests/service/shutdown_guard.rs b/nexus-watcher/tests/service/shutdown_guard.rs index 2eb6b1b4f..fb97f207b 100644 --- a/nexus-watcher/tests/service/shutdown_guard.rs +++ b/nexus-watcher/tests/service/shutdown_guard.rs @@ -1,18 +1,75 @@ use crate::service::utils::{ create_random_homeservers_and_persist, setup, MockEventProcessorResult, - MockEventProcessorRunner, PanickingDefaultHsRunner, PanickingExternalHsRunner, + MockEventProcessorRunner, }; use anyhow::Result; -use nexus_watcher::service::NexusWatcher; +use nexus_common::types::DynError; +use nexus_watcher::service::{ + NexusWatcher, ProcessedStats, TEventProcessor, TEventProcessorRunner, +}; use std::sync::Arc; use std::time::Duration; +use tokio::sync::watch::Receiver; use tokio::time::sleep; -/// Test A: Sending an external shutdown signal causes `run_tasks` to return `Ok`. -/// Verifies the forwarder task bridges the external signal into the internal channel, -/// causing all processing tasks to exit gracefully. -#[tokio_shared_rt::test(shared)] -async fn test_run_tasks_clean_shutdown_via_external_signal() -> Result<()> { +const WATCHER_SLEEP_MS: u64 = 100; +const SHUTDOWN_DELAY_MS: u64 = 500; + +/// Controls which processing path should panic. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum PanicTarget { + DefaultHomeserver, + ExternalHomeservers, +} + +/// Wraps a [`MockEventProcessorRunner`] and panics in one selected run method. +struct PanickingRunner { + inner: MockEventProcessorRunner, + panic_target: PanicTarget, +} + +#[async_trait::async_trait] +impl TEventProcessorRunner for PanickingRunner { + fn shutdown_rx(&self) -> Receiver { + self.inner.shutdown_rx() + } + + fn default_homeserver(&self) -> &str { + self.inner.default_homeserver() + } + + fn monitored_homeservers_limit(&self) -> usize { + self.inner.monitored_homeservers_limit() + } + + async fn external_homeservers_by_priority(&self) -> Result, DynError> { + self.inner.external_homeservers_by_priority().await + } + + async fn build(&self, homeserver_id: String) -> Result, DynError> { + self.inner.build(homeserver_id).await + } + + async fn run_default_homeserver(&self) -> Result { + if matches!(self.panic_target, PanicTarget::DefaultHomeserver) { + panic!("simulated default HS task crash"); + } + self.inner.run_default_homeserver().await + } + + async fn run_external_homeservers(&self) -> Result { + if matches!(self.panic_target, PanicTarget::ExternalHomeservers) { + panic!("simulated external HS task crash"); + } + self.inner.run_external_homeservers().await + } +} + +async fn setup_runner_and_channel() -> Result<( + MockEventProcessorRunner, + tokio::sync::watch::Sender, + tokio::sync::watch::Receiver, +)> { let mut event_processor_list = setup().await?; let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); @@ -28,19 +85,42 @@ async fn test_run_tasks_clean_shutdown_via_external_signal() -> Result<()> { } let runner = MockEventProcessorRunner::new(event_processor_list, 3, shutdown_rx.clone()); + Ok((runner, shutdown_tx, shutdown_rx)) +} + +async fn assert_panic_propagates(panic_target: PanicTarget) -> Result<()> { + let (inner, _shutdown_tx, shutdown_rx) = setup_runner_and_channel().await?; + let runner = Arc::new(PanickingRunner { + inner, + panic_target, + }); + + let result = NexusWatcher::run_tasks(shutdown_rx, runner, WATCHER_SLEEP_MS).await; + assert!( + result.is_err(), + "run_tasks should return Err when a task panics" + ); + Ok(()) +} + +/// Test A: Sending an external shutdown signal causes `run_tasks` to return `Ok`. +/// Verifies the forwarder task bridges the external signal into the internal channel, +/// causing all processing tasks to exit gracefully. +#[tokio_shared_rt::test(shared)] +async fn test_run_tasks_clean_shutdown_via_external_signal() -> Result<()> { + let (runner, shutdown_tx, shutdown_rx) = setup_runner_and_channel().await?; let runner = Arc::new(runner); tokio::spawn(async move { - sleep(Duration::from_millis(500)).await; + sleep(Duration::from_millis(SHUTDOWN_DELAY_MS)).await; let _ = shutdown_tx.send(true); }); - let result = NexusWatcher::run_tasks(shutdown_rx, runner, 100).await; + let result = NexusWatcher::run_tasks(shutdown_rx, runner, WATCHER_SLEEP_MS).await; assert!( result.is_ok(), "run_tasks should return Ok on clean shutdown" ); - Ok(()) } @@ -50,30 +130,7 @@ async fn test_run_tasks_clean_shutdown_via_external_signal() -> Result<()> { /// `run_tasks` returns `Err` because the panicked `JoinError` is observed during drain. #[tokio_shared_rt::test(shared)] async fn test_run_tasks_default_hs_panic_propagates_via_guard() -> Result<()> { - let mut event_processor_list = setup().await?; - let (_shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); - - for _ in 0..3 { - create_random_homeservers_and_persist( - &mut event_processor_list, - None, - MockEventProcessorResult::Success, - None, - shutdown_rx.clone(), - ) - .await; - } - - let inner = MockEventProcessorRunner::new(event_processor_list, 3, shutdown_rx.clone()); - let runner = Arc::new(PanickingDefaultHsRunner { inner }); - - let result = NexusWatcher::run_tasks(shutdown_rx, runner, 100).await; - assert!( - result.is_err(), - "run_tasks should return Err when the default HS task panics" - ); - - Ok(()) + assert_panic_propagates(PanicTarget::DefaultHomeserver).await } /// Test C: A panic in `run_external_homeservers` causes the spawned task to crash. @@ -82,28 +139,5 @@ async fn test_run_tasks_default_hs_panic_propagates_via_guard() -> Result<()> { /// `run_tasks` returns `Err` because the panicked `JoinError` is observed during drain. #[tokio_shared_rt::test(shared)] async fn test_run_tasks_external_hs_panic_propagates_via_guard() -> Result<()> { - let mut event_processor_list = setup().await?; - let (_shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); - - for _ in 0..3 { - create_random_homeservers_and_persist( - &mut event_processor_list, - None, - MockEventProcessorResult::Success, - None, - shutdown_rx.clone(), - ) - .await; - } - - let inner = MockEventProcessorRunner::new(event_processor_list, 3, shutdown_rx.clone()); - let runner = Arc::new(PanickingExternalHsRunner { inner }); - - let result = NexusWatcher::run_tasks(shutdown_rx, runner, 100).await; - assert!( - result.is_err(), - "run_tasks should return Err when the external HS task panics" - ); - - Ok(()) + assert_panic_propagates(PanicTarget::ExternalHomeservers).await } diff --git a/nexus-watcher/tests/service/utils/mod.rs b/nexus-watcher/tests/service/utils/mod.rs index 59742b5f7..d7d140e28 100644 --- a/nexus-watcher/tests/service/utils/mod.rs +++ b/nexus-watcher/tests/service/utils/mod.rs @@ -1,10 +1,8 @@ -mod panicking_runner; mod processor; mod processor_runner; mod result; mod setup; -pub use panicking_runner::{PanickingDefaultHsRunner, PanickingExternalHsRunner}; pub use processor::{ create_mock_event_processors, create_random_homeservers_and_persist, MockEventProcessor, }; diff --git a/nexus-watcher/tests/service/utils/panicking_runner.rs b/nexus-watcher/tests/service/utils/panicking_runner.rs deleted file mode 100644 index e97385ce2..000000000 --- a/nexus-watcher/tests/service/utils/panicking_runner.rs +++ /dev/null @@ -1,73 +0,0 @@ -use crate::service::utils::MockEventProcessorRunner; -use nexus_common::types::DynError; -use nexus_watcher::service::{ProcessedStats, TEventProcessor, TEventProcessorRunner}; -use std::sync::Arc; -use tokio::sync::watch::Receiver; - -/// Wraps a [`MockEventProcessorRunner`] but panics in `run_default_homeserver()`. -/// Used to test that a panic in the default HS task is detected by the `JoinSet` -/// and causes the external HS task to stop via the internal shutdown channel. -pub struct PanickingDefaultHsRunner { - pub inner: MockEventProcessorRunner, -} - -#[async_trait::async_trait] -impl TEventProcessorRunner for PanickingDefaultHsRunner { - fn shutdown_rx(&self) -> Receiver { - self.inner.shutdown_rx() - } - - fn default_homeserver(&self) -> &str { - self.inner.default_homeserver() - } - - fn monitored_homeservers_limit(&self) -> usize { - self.inner.monitored_homeservers_limit() - } - - async fn external_homeservers_by_priority(&self) -> Result, DynError> { - self.inner.external_homeservers_by_priority().await - } - - async fn build(&self, homeserver_id: String) -> Result, DynError> { - self.inner.build(homeserver_id).await - } - - async fn run_default_homeserver(&self) -> Result { - panic!("simulated default HS task crash"); - } -} - -/// Wraps a [`MockEventProcessorRunner`] but panics in `run_external_homeservers()`. -/// Used to test that a panic in the external HS task is detected by the `JoinSet` -/// and causes the default HS task to stop via the internal shutdown channel. -pub struct PanickingExternalHsRunner { - pub inner: MockEventProcessorRunner, -} - -#[async_trait::async_trait] -impl TEventProcessorRunner for PanickingExternalHsRunner { - fn shutdown_rx(&self) -> Receiver { - self.inner.shutdown_rx() - } - - fn default_homeserver(&self) -> &str { - self.inner.default_homeserver() - } - - fn monitored_homeservers_limit(&self) -> usize { - self.inner.monitored_homeservers_limit() - } - - async fn external_homeservers_by_priority(&self) -> Result, DynError> { - self.inner.external_homeservers_by_priority().await - } - - async fn build(&self, homeserver_id: String) -> Result, DynError> { - self.inner.build(homeserver_id).await - } - - async fn run_external_homeservers(&self) -> Result { - panic!("simulated external HS task crash"); - } -}